以太坊Geth中P2P子模块的源码解析

TCP模块

TCP模块是用来传输依赖于稳定传输的数据,具体细分为模拟远程对等点的peer类、监听收到的连接请求的TCP Listener、调度主动连接请求的dialScheduler类。

- 对等点peer

“对等点(peer)”是对远程终端的一个抽象,与远程终端节点通信就是通过这个对象。代码在p2p/peer.go中。
从peer的核心函数就能粗略地看到peer在负责什么样的职能:(1)启动一个协程不断监听对方传来的消息;(2)启动一个协程定时ping对方,检测连接的活性;(3)启动本节点和对方节点之间的所有协议(这里的协议是指类似ping-pong之类的协议,下文会具体介绍)。

func (p *Peer) run() (remoteRequested bool, err error) {
   var (
      writeStart = make(chan struct{}, 1)
      writeErr   = make(chan error, 1)
      readErr    = make(chan error, 1)
   ...
   )
   p.wg.Add(2)
   // 监听所有消息的循环函数
   go p.readLoop(readErr)
   // 定时ping的循环函数
   go p.pingLoop()
   ...
   // 启动协议的函数
   p.startProtocols(writeStart, writeErr)

   // 下面的for循环就是在做错误处理
loop:
   for {
      select {
      case err = <-writeErr:
         ...
      case err = <-readErr:
         ...
      case err = <-p.protoErr:
         ...
      case err = <-p.disc:
         ...
      }
   }
   ...
   p.wg.Wait()
   ...
}

func (p *Peer) readLoop(errc chan<- error) {
   defer p.wg.Done()
   for {
      // 从底层TCP连接中读取消息
      // 这里peer的成员rw是一个TCP连接的封装,下文具体介绍
      msg, err := p.rw.ReadMsg()
      if err != nil {
         // 如果发生错误,则从channel传给peer的主协程处理
         errc <- err
         return
      }
      ...
      // 如果没有错误,则调用handle函数处理收到的消息,
      // 在handle函数出现错误时,则也通过channel传给peer的主协程处理
      // handle函数的具体内容会在下文展开
      if err = p.handle(msg); err != nil {
         errc <- err
         return
      }
   }
}

func (p *Peer) pingLoop() {
   ping := time.NewTimer(pingInterval)
   defer p.wg.Done()
   defer ping.Stop()
   for {
      select {
      case <-ping.C:
         // 定时ping对等节点,如果发生错误,则通过protoErr这个channel
         // 传给peer的主协程处理
         if err := SendItems(p.rw, pingMsg); err != nil {
            p.protoErr <- err
            return
         }
         ...
         ...
      }
   }
}

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
   p.wg.Add(len(p.running))
   for _, proto := range p.running {
      ...
      var rw MsgReadWriter = proto
      go func() {
         defer p.wg.Done()
         // 调用每个协议的Run函数,如果一切正常,则协议的Run函数会一直
         // 占据着这个协程,直至这个peer结束或是有其他错误出现;否则,
         // 返回一个错误
         // 下文会具体介绍“协议”这个对象
         err := proto.Run(p, rw)
         ...
         // 如果发生了错误,则通过protoErr这个channel传给peer的主
         // 协程处理
         p.protoErr <- err
      }()
   }
}

这里我们主要要关注3个点:在实际收发消息时是如何调用net.Conn接口的(如何封装net.Conn的)?处理消息时的handle函数做了些什么?“协议”这个对象到底是什么?
先看底层的传输问题,peer对象是如何调用net.Conn接口来实现收发消息的?peer对象收发消息的原理并不复杂,它就是调用了net.Conn的接口,底层通过TCP传输。但麻烦的点在于它的调用关系,这里面涉及了多个类,层级关系稍微有点复杂。
peer所有的收发操作都是通过成员对象rw,而rw又是conn类,conn类除了包含当前连接的信息、对方节点的信息、双方支持的协议等等之外,还有一点很重要的是它是transport类的扩展,transport类是真正封装net.Conn实现消息收发功能的。

type Peer struct {
   rw      *conn
   ...
}

// p2p/server.go
type conn struct {
   fd net.Conn
   transport
   node  *enode.Node
   flags connFlag
   cont  chan error
   caps  []Cap
   name  string
}

transport类是一个interface,它扩展了MsgReadWriter类,MsgReadWriter类也是一个interface,拥有收/发消息的接口。此外,从transport的定义中也不难看出Geth底层的TCP传输还会额外将消息进行加密。

// p2p/message.go
type MsgReader interface {
   ReadMsg() (Msg, error)
}

type MsgWriter interface {
   WriteMsg(Msg) error
}

type MsgReadWriter interface {
   MsgReader
   MsgWriter
}

// p2p/server.go
type transport interface {
   // The two handshakes.
   doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error)
   doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
   MsgReadWriter
   close(err error)
}

现在代码中transport类的具体实现是rlpxTransport类(代码在p2p/transport.go中),它不仅实现了transport类的两个握手接口,也实现了MsgReadWriter类的收/发消息接口。下面代码可以看出rlpxTransport类做的所有事情都是使用它的成员变量conn。

// p2p/server.go
func (srv *Server) Start() (err error) {
   ...
   // 现在代码中transport类的具体实现是rlpxTransport类
   srv.newTransport = newRLPX
   ...
}

// p2p/transport.go
type rlpxTransport struct {
   rmu, wmu sync.Mutex
   conn     *rlpx.Conn
   ...
}

func (t *rlpxTransport) ReadMsg() (Msg, error) {
   t.rmu.Lock()
   defer t.rmu.Unlock()
   ...
   code, data, wireSize, err := t.conn.Read()
   ...
}

func (t *rlpxTransport) WriteMsg(msg Msg) error {
   t.wmu.Lock()
   defer t.wmu.Unlock()
   ...
   size, err := t.conn.Write(msg.Code, t.wbuf.Bytes())
   ...
}

func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
   ...
   return t.conn.Handshake(prv)
}

func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
   ...
   // Send函数内部调用的是WriteMsg函数来发送消息
   go func() { werr <- Send(t, handshakeMsg, our) }()
   ...
}

成员变量conn是rlpx.Conn类(代码在p2p/rlpx/rlpx.go),它内部最主要的成员就是session,收/发消息和握手的操作都是通过这个成员来完成的。

type Conn struct {
   dialDest *ecdsa.PublicKey
   conn     net.Conn
   session  *sessionState
   ...
}

func (c *Conn) Read() (code uint64, data []byte, wireSize int, err error) {
   ...
   frame, err := c.session.readFrame(c.conn)
   ...
}

func (c *Conn) Write(code uint64, data []byte) (uint32, error) {
   ...
   err := c.session.writeFrame(c.conn, code, data)
   ...
}

session是sessionState类,这个类是两个节点间会话的一个抽象,负责执行两节点间的握手、收/发消息、消息的加密解密,同时也会保存握手密钥等信息。sessionState以帧的形式来传输数据,分为帧头、数据区2个部分。帧头标明了该帧数据区的大小DataSize,和对DataSize做的16byte MAC码;数据区包含了数据的类型码(表明当前消息是什么类型的,如:ping、pong等等),实际的消息数据,和对消息做的16byte MAC码。sessionState就是这条链路上的最底层了,它直接调用了net.Conn的接口来传输数据。
5.jpg

// 这里就不贴出sessionState传输数据的所有代码了,就简单附上sessionState调用
// net.Conn的地方,表明sessionState是直接调用net.Conn的接口的
// p2p/rlpx/rlpx.go
func (h *sessionState) readFrame(conn io.Reader) ([]byte, error) {
   ...
   // 这里的conn变量是从rlpx.Conn类传进来的net.Conn变量
   frame, err := h.rbuf.read(conn, int(rsize))
   ...
}

func (h *sessionState) writeFrame(conn io.Writer, code uint64, data []byte) error {
   ...
   // 这里的conn变量是从rlpx.Conn类传进来的net.Conn变量
   _, err := conn.Write(h.wbuf.data)
   ...
}

总结一下:peer对象是通过conn对象来传输消息的;conn对象包含了当前连接的信息、对方节点的信息、双方支持的协议等等,同时conn调用了transport这个interface来传输消息;rlpxTransport类实现了transport的接口,它调用rlpx.Conn类来传输消息;rlpx.Conn也是对连接的一个抽象,它主要负责把底层的数据帧转换成数据结构供上层使用,它调用sessionState来传输数据;sessionState是对一个会话的抽象,负责执行两节点间的握手、收/发消息、消息的加密解密,同时也会保存握手密钥等信息,它就是这个链路上的最底端,直接调用了net.Conn接口。
第二个问题,处理消息时的handle函数做了些什么?从下面代码段中可以看到,对于基础的协议类型,peer只处理ping和断开连接两种,做法分别是回复pong消息和实施断开操作。对于非基础类型的协议,即外部定义的协议,peer先会根据消息的code值获取相应的协议,然后将消息推送到协议的输入channel:

const (
   // 基础的协议类型
   handshakeMsg = 0x00
   discMsg      = 0x01
   pingMsg      = 0x02
   pongMsg      = 0x03
)

func (p *Peer) handle(msg Msg) error {
   switch {
   case msg.Code == pingMsg:
      ...
      go SendItems(p.rw, pongMsg)
   case msg.Code == discMsg:
      // 这里返回m.R后调用者就会断开连接
      var m struct{ R DiscReason }
      rlp.Decode(msg.Payload, &m)
      return m.R
   case msg.Code < baseProtocolLength:
      // 其他基础类型的协议就直接丢弃
      return msg.Discard()
   default:
      // 根据消息的code获取对应的自定义协议
      proto, err := p.getProto(msg.Code)
      ...
      // 将消息推送到协议的输入channel
      proto.in <- msg
      ...
   }
   return nil
}

这里很自然的引出第三个问题,这里的什么是“协议”?“协议”就是能对某些特定的消息做出特定的回应的对象,例如一个名为“My Ping-Pong”的协议,当它收到“My Ping”类型的消息后能回复一个“My Pong”的回应。下面是“协议”Protocol类的定义,除了协议名协议版本等基本信息外,核心函数是Run函数,这是一个在子协程常驻的函数,用于接收和回复消息:

// p2p/protocol.go
type Protocol struct {
   Name string
   Version uint
   ...
   Run func(peer *Peer, rw MsgReadWriter) error
   NodeInfo func() interface{}
   PeerInfo func(id enode.ID) interface{}
   ...
}

peer对象中用的“协议”对象是Protocol类的扩展版,里面添加了用于接收输入的channel、传出error的channel和传出网络消息的MsgWriter。MagWriter就是上文介绍的用于传输网络消息的接口,实际实现是rlpxTransport类。Peer.handle中会将非基础类型的消息传进in这个channel中,Peer.run会监听werr这个channel传出的错误。此外还值得一提的是wstart这个channel,这个channel容量是1,所有协议共享一份实例,用于将协议的消息传出串行化:

type protoRW struct {
   Protocol
   in     chan Msg    // 输入的channel
   closed <-chan struct{}
   wstart <-chan struct{}
   werr   chan<- error // 输出error的channel
   ...
   w      MsgWriter   // 上文介绍的MsgWriter接口,用于将回复消息传出去
}

peer对象和它的协议之间的关系会像这样:
6.jpg

- TCP Listener

TCP Listener是监听远程发来的连接请求,用的直接就是net.Listen接口,然后将建立的连接封装成peer对象进行管理。具体的转换过程是由Server类来实现的,留到后面介绍。

- 连接调度器dialScheduler

dialScheduler代码在p2p/dial.go中。dialScheduler主要的工作就是在一定的数量范围内不断地主动发起连接,连到别的以太坊节点,当连接数量超过阈值则不再主动发起。建立好的连接会被封装成一个peer对象储存起来,供后续使用。此外dialScheduler还会维护一些固定的节点连接,这些节点来自于配置文件,当连接断开或者是出错时会立即开始重连操作。
dialScheduler的核心逻辑就在它的loop函数中:(1)从另一个异步循环readNodes()中(readNodes()会以一个相对随机的方式将节点信息传入nodesIn channel中。这个函数下文介绍)获取需要连接的节点,然后对这个节点发起连接(连接前会先判断连接数量是否达到上限);(2)维护“正在连接中”、“已经连接完成”的列表,用来防止重复连接到某个节点上;(3)维护固定连接的列表,并定时向这些固定连接发起心跳包。
dialScheduler中的连接操作是直接调用net包的接口来建立一个net.conn对象,这个net.conn对象会被封装成一个peer对象供别的模块使用。

func (d *dialScheduler) loop(it enode.Iterator) {
   ...
   for {
      ...
      select {
      // nodesIn中的节点来自于另一个异步循环readNodes(),
      // readNodes()会以一种相对随机的方式传入节点信息,
      // loop()函数收到节点信息后就发起异步连接
      case node := <-nodesIn:
         ...
         // 这里会调用net包进行连接,返回net.conn对象
         d.startDial(newDialTask(node, dynDialedConn))

      // 连接建立完成后把该连接从“正在连接中”的列表中删除
      case task := <-d.doneCh:
         id := task.dest.ID()
         delete(d.dialing, id)
         ...

      // 连接完成后net.conn对象会被封装成peer对象,这里记录
      // 所有已经连上的对等点peer,以免后续重复连接
      case c := <-d.addPeerCh:
         ...
         id := c.node.ID()
         // 只记录当前id已经连接上了
         d.peers[id] = struct{}{}
         ...

      // 当和一个对等点断开连接后,需要将该对等点id从
      // “已经连接上的对等点id”列表中删除
      case c := <-d.remPeerCh:
         ...
         delete(d.peers, c.node.ID())
         ...

      // static dial就是那些来自配置文件
      // 的固定连接
      case node := <-d.addStaticCh:
         id := node.ID()
         ...
         task := newDialTask(node, staticDialedConn)
         d.static[id] = task
         ...

      // 从固定连接中删除某个连接
      case node := <-d.remStaticCh:
         id := node.ID()
         ...
         delete(d.static, id)
         ...

      // 这个case是用来维护固定连接static dial的,
      // 定时向固定连接发生心跳包
      case <-historyExp:
         ...
      case <-d.ctx.Done():
         ...
      }
   }
   ...
}

dialScheduler会在另外一个协程中启动readNodes函数,readNodes函数内部一个循环不断获取节点信息并将获取到的节点信息传入nodesIn channel中,给到loop函数进行连接。

func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
   ...
   go d.readNodes(it)
   go d.loop(it)
   ...
}


func (d *dialScheduler) readNodes(it enode.Iterator) {
   defer d.wg.Done()
   for it.Next() {
      select {
      case d.nodesIn <- it.Node():
      case <-d.ctx.Done():
      }
   }
}

readNodes()是从一个迭代器中一直读取next来获取新节点的,这个迭代器是对UDP模块的一个封装,它会不断让UDP模块寻找一个随机ID的节点,并用loopupIterator去接收寻找过程中每一次收到的节点列表,最后从loopupIterator中读出所有节点信息。当一次寻找节点操作结束后就又开始下一次寻找随机ID的操作,以此往复就能不断获得以较随机的方式获取新的节点信息。

// p2p/server.go
func (srv *Server) setupDiscovery() error {
   ...
   ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
   ...
   srv.discmix.AddSource(ntab.RandomNodes())
   ...
}
func (srv *Server) setupDialScheduler() {
   ...
   srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
   ...
}

// p2p/discover/v4_udp.go
func (t *UDPv4) RandomNodes() enode.Iterator {
   return newLookupIterator(t.closeCtx, t.newRandomLookup)
}

Server类

Server类代码在p2p/server.go中,它是对整个p2p模块的一个抽象,Geth把整个p2p模块看作一个服务器,这个服务器内部又有类Kademlia算法、与其他以太坊节点的连接、本地节点网络信息等等等等。因此Server类会调用上文提到的所有类,并对外提供相对抽象的接口。
外部想启动一个P2P模块就调用Server类的Start函数,这将启动一个Server对象,Server对象在启动时便会加载各个子模块:

  • 从数据库中加载本地节点localNode的信息
  • 启动TCP Listener,实际调用的是net.Listen()函数来监听
  • 启动UDP模块
  • 启动dialScheduler开始主动连接别的节点

这些子模块在被加载后就会自动开始工作:监听别的节点发来的TCP连接请求;开始维护Kademlia节点列表;开始主动向随机节点发起TCP连接。Start函数调用run函数开启Server的主循环。

func (srv *Server) Start() (err error) {
   ...
   srv.listenFunc = net.Listen
   ...
   // 从数据库中加载本地节点的信息
   if err := srv.setupLocalNode(); err != nil {
      return err
   }
   if srv.ListenAddr != "" {
      // 启动TCP Listener
      if err := srv.setupListening(); err != nil {
         return err
      }
   }
   // 启动UDP模块
   if err := srv.setupDiscovery(); err != nil {
      return err
   }
   // 启动连接调度器,开始主动连接别的节点
   srv.setupDialScheduler()
   ...
   // 异步协程开启Server的主循环
   go srv.run()
   ...
}
func (srv *Server) setupLocalNode() error {
   ...
   db, err := enode.OpenDB(srv.Config.NodeDatabase)
   ...
   srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
   ...
}
func (srv *Server) setupListening() error {
   ...
   listener, err := srv.listenFunc("tcp", srv.ListenAddr)
   srv.listener = listener
   ...
   go srv.listenLoop()
   ...
}
func (srv *Server) listenLoop() {
   ...
   for {
      ...
      fd, err = srv.listener.Accept()
      ...
   }
}
func (srv *Server) setupDiscovery() error {
   ...
   ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
   ...
   srv.ntab = ntab
   ...
   srv.DiscV5, err = discover.ListenV5(conn, srv.localnode, cfg)
   ...
}
func (srv *Server) setupDialScheduler() {
   ...
   srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
   ...
}

run()循环主要在维护一个peer列表,存储着所有已经连接上的对等点。这个peer列表是run()函数的局部变量,这也表示着这些peer对象的声明周期直到run()结束(run结束也代表着Server关闭)。外部如果也想对这些peer做一些操作的话可以把回调函数送进peerOp channel,在run()循环中会调用这个回调函数,参数是peer列表:

func (srv *Server) run() {
   ...
   var (
      peers        = make(map[enode.ID]*Peer)
      ...
   )
   ...
running:
   for {
      select {
      case <-srv.quit:
         break running

      case n := <-srv.addtrusted:
         ...

      case n := <-srv.removetrusted:
         ...

      case op := <-srv.peerOp:
         op(peers)
         ...

      case c := <-srv.checkpointPostHandshake:
         ...

      case c := <-srv.checkpointAddPeer:
         ...

      case pd := <-srv.delpeer:
         ...
         delete(peers, pd.ID())
         ...
      }
   }
   ...
   for _, p := range peers {
      p.Disconnect(DiscQuitting)
   }
   ...
}

外部若想添加P2P模块支持的协议,即添加对等点Peer之间的通信协议,就将Protocol对象追加到Server的Protocols字段中。Server的Protocols字段是对外可见的,可以直接读写:

func (srv *Server) launchPeer(c *conn) *Peer {
   // 创建Peer对象时将Server的Protocols字段作为参数,
   // 表明支持哪些协议
   p := newPeer(srv.log, c, srv.Protocols)
   ...
   go srv.runPeer(p)
   return p
}

外部想关闭一个Server就调用Stop函数结束Server的生命周期:

func (srv *Server) Stop() {
   ...
   close(srv.quit)
   ...
   srv.loopWG.Wait()
}