以太坊Geth中P2P子模块的源码解析

总览

Geth的P2P模块在p2p/目录下,它在本地建立和维护一个服务器,它的所有行为都围绕着这个服务器和其他节点上的服务器之间的连接。
1.jpg
Geth网络构建在TCP/UDP协议之上:上图的UDP模块就是我们收悉的Kademlia算法,它负责发现节点,并维护那张“K桶表”;TCP模块用于传输更依赖于稳定性的数据,TCP模块的节点信息来自于UDP模块维护的“K桶表”。

P2P节点信息

Geth用Node类来抽象一个节点的信息,如:节点ID、节点IP等,代码在p2p/enode/和p2p/enr/下。Node类用一个字段来存储节点ID,这个ID由节点的公钥转换而来的;此外还有一个r字段来存放这个节点的档案(record)信息。
档案里以键值对的形式来存放信息(即pairs字段),如:节点公钥、节点IP地址、节点UDP/TCP端口等。signature字段是这个档案的签名,需要与pairs中存的公钥相对应。raw是经过RLP(Recursive Length Prefix)编码后的档案信息。这些信息不会是一成不变的,例如IP、端口等信息就可能会发生变化,为了跟踪节点信息的版本更新设立了seq字段,每更新一次信息seq序列号就是会加一,因此seq值越大则版本越新。

// pair is a key/value pair in a record.
type pair struct {
   k string
   v rlp.RawValue
}

// Record represents a node record. The zero value is an empty record.
type Record struct {
   seq       uint64
   signature []byte
   raw       []byte
   pairs     []pair // sorted list of all key/value pairs
}

// Node represents a host on the network.
type Node struct {
   r  enr.Record
   id ID
}

用seq来跟踪节点信息的版本有一个问题:在启动以太坊客户端时,当前节点的很多节点信息可能都发生了变化,如节点IP、端口等,此时需要用一个更大的seq值来表明当前是最新版本,如果当前的主机上没有存有任何数据,那怎么获取之前档案版本的seq值呢?换句话说,怎么保证当前生成的seq值就是最大的? 为了解决这个问题,Geth用了这样一个机制:(1)如果本地存有上次的seq值,则将基于之前的seq值做增长;如果没有,则在初始化时将seq设置为当前的毫秒时间戳。(2)节点信息的更新只能一毫秒更新一次。有了以上两点再加上以太坊的时间系统时基于NTP的,所以基本能保证每次初始化时的seq都是最大最新的。

// p2p/enode/localnode.go

func (ln *LocalNode) Node() *Node {
   ...

   // The initial sequence number is the current timestamp in milliseconds. To ensure
   // that the initial sequence number will always be higher than any previous sequence
   // number (assuming the clock is correct), we want to avoid updating the record faster
   // than once per ms. So we need to sleep here until the next possible update time has
   // arrived.
   lastChange := time.Since(ln.update)
   if lastChange < recordUpdateThrottle {
      time.Sleep(recordUpdateThrottle - lastChange)
   }

   ...
}

- NodeDB

NodeDB是存放节点信息的数据库,是levelDB的一个简单的封装,支持内存或者硬盘存储。它内部以键值对的形式存信息,除了存正儿八经的节点信息外,还存连某个节点失败多少次之类的一些杂七杂八的。

- LocalNode

LocalNode是本地节点的一个抽象。LocalNode和Node两个结构体的关系:LocalNode是本地存节自身点信息的一个类型,它更像是对“本地节点”的一个抽象,里面除了存节点信息还存私钥等等。Node是以太坊传输节点信息的通用接口类型,LocalNode里的信息需要转换成Node类型才能往外传输。

type LocalNode struct {
   cur       atomic.Value // 存有最新档案(record)的Node对象的指针
   id        ID
   key       *ecdsa.PrivateKey // 私钥
   db        *DB               // 存放信息的数据库,现在用的是NodeDB
   mu        sync.RWMutex
   seq       uint64    // 自身的seq值
   update    time.Time // 上一次更新自己的节点信息的时间
   entries   map[string]enr.Entry // 自己的节点信息键值对
   endpoint4 lnEndpoint   // 用于预测自己外网的IPv4
   endpoint6 lnEndpoint   // 用于预测自己的外网IPv6
}

LocalNode中的endpoint4和endpoint6是用来预测外网IP的,这个是当自身在NAT之后时,需要知道自己的外网IP。思路是统计所有发给自己的网络数据包的目标IP,得票最多的IP就是自己的外网IP。显然,这个思路太简单了,Geth中也没有用这样预测出来的结果。当自身在NAT之后时,Geth用的是第三方库来获取外网IP和映射端口号,现在支持pmp和upnp,封装第三方库的代码在p2p/nat/下。

UDP模块

UDP模块整体都在p2p/discover/目录下,主要是实现了Kademlia算法来发现节点和维护节点列表。 现在代码中有两个版本V4和V5,基本功能都差不多。

- V4版

细分为3个部分:编解码器、负责维护节点信息的“K桶表”、逻辑控制器。
V4会使用以下6种类型的消息进行通信:

const (
   PingPacket = iota + 1
   PongPacket
   FindnodePacket    // 查找节点请求
   NeighborsPacket   // 查找节点回复,里面包含被查询节点及其周围的“邻居”(也可能不包含被查询节点)
   ENRRequestPacket  // 索要节点本身的信息(具体说明看下文)
   ENRResponsePacket // 回复自身的信息
)

-- 编解码器

在p2p/discover/v4wire/v4wire.go中,负责把数据转换为消息帧的形式,并将内容转为RLP(Recursive Length Prefix)编码。
消息帧分为帧头和数据区:帧头包含MAC校验码和发送者对消息的签名;数据区就是存放实际数据的地方,数据区的第一个字节是消息类型。
2.jpg

-- K桶表Table类

Table类在p2p/discover/table.go中,它负责维护节点信息。
Table中有几个主要的成员变量:

bucketSize      = 16 // Kademlia bucket中存放的节点个数
maxReplacements = 10 // Kademlia bucket中备用的节点个数

type node struct {
   enode.Node
   addedAt        time.Time // 这个节点被加到Table中的时间
   livenessChecks uint
}

type bucket struct {
   entries      []*node // 正式节点数组
   replacements []*node // 备用节点数组
   ...
}

type Table struct {
   mutex   sync.Mutex
   buckets [nBuckets]*bucket  // 一个距离一个bucket
   db         *enode.DB
   ...
}

这里node类其实就是前面介绍的节点信息Node类的一个扩展,添加了与时间相关的信息方便后续管理; bucket就是存放节点信息的篮子,且除了正式的数组外还有一个备用数组,在正式数组中节点不够的时候顶替上来;Table中有一个bucket数组,每个bucket都存放不同距离的节点,这的设计思路就和Kademlia是一样的。每个bucket的正式数组最多放16个节点,备用数组最多有10个节点。
在计算两个节点之间的距离时比较有意思:

// p2p/enode/node.go

func LogDist(a, b ID) int {
   lz := 0
   for i := range a {
      x := a[i] ^ b[i]
      if x == 0 {
         lz += 8
      } else {
         lz += bits.LeadingZeros8(x)
         break
      }
   }
   return len(a)*8 - lz
}

Kademlia原算法通过将两个节点ID进行异或计算得到节点间的距离。Geth与其相似,但又略有不同,Geth先通过异或两个节点ID得到前导零的个数lz(lz表示这两个ID有多少个比特位是相同的),再将ID总位数(256位)减lz得到N,2N(2的N次方)就是两个节点的距离值(即N=log2distance)。如果把节点ID放到一棵二叉树中的话,这种算法就有点像计算两个节点ID从哪一棵子树开始分叉的,因为节点ID长度是固定的,所以所有ID都会是叶子节点,因此只需要计算这两个ID是从哪棵子树开始分叉的就能知道这两个ID的距离了。
3.jpg
从一个ID出发计算和别的ID的距离时,每一个子树下的ID都是相同距离(因为ID长度是固定的,大家都是叶子节点),每一个分叉都会是一个bucket,这就导致bucket过多且浪费。Geth设置了一个bucket个数上限,并将一定距离(N<=239)的ID都合并到buckets[0]中:

// We keep buckets for the upper 1/15 of distances because
// it's very unlikely we'll ever encounter a node that's closer.
hashBits          = len(common.Hash{}) * 8 // (=256)
nBuckets          = hashBits / 15          // Number of buckets (=17)
bucketMinDistance = hashBits - nBuckets    // Log distance of closest bucket (=239)

接下来看看Table类是怎么更新维护节点信息的。在初始化的时候先将配置文件中的启动节点和数据库中的种子节点存到表中:

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
   tab := &Table{
      ...
   }
   // 先将配置文件中的启动节点存入tab
   if err := tab.setFallbackNodes(bootnodes); err != nil {
      return nil, err
   }
   ...
   tab.loadSeedNodes()
   return tab, nil
}

func (tab *Table) loadSeedNodes() {
   // 从数据库中读取种子节点
   seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
   seeds = append(seeds, tab.nursery...)
   for i := range seeds {
      seed := seeds[i]
      ...
      // 添加到tab中
      tab.addSeenNode(seed)
   }
}

func (tab *Table) addSeenNode(n *node) {
   if n.ID() == tab.self().ID() {
      return
   }

   tab.mutex.Lock()
   defer tab.mutex.Unlock()
   b := tab.bucket(n.ID()) // 根据ID距离获取对应的bucket
   if len(b.entries) >= bucketSize {
      // 这个bucket以及满了,加到备用的bucket中
      tab.addReplacement(b, n)
      return
   }
   // 加到bucket的末尾
   b.entries = append(b.entries, n)
   b.replacements = deleteNode(b.replacements, n)
   n.addedAt = time.Now()
   if tab.nodeAddedHook != nil {
      tab.nodeAddedHook(n)
   }
}

初始化完成后就可以调用Table的loop函数开始它的节点维护生涯,loop()中会定时调用两个关键函数doRefresh()和doRevalidate()。
doRefresh()会先调loadSeedNodes()从数据库中重新载入种子节点,这么做的原因是经过一段时间的节点信息更新,种子节点可能会从列表中被删除(每个bucket的节点数有上限),重新将种子节点载入有助于保证列表的有效性。之后向网络发起寻找自己和寻找3个随机ID的操作。这里的“寻找”操作具体怎么执行后面再讨论,暂时就把它理解为类似于Kademlia算法中的寻找节点操作。
doRevalidate()函数会从随机一个bucket中读取组后一个节点,然后ping它,Geth中UDP模块的pong包中会带有节点的seq值,doRevalidate()会判断如果pong包中的seq值比本地存有的seq值大,则还会再发一个请求给这个节点,向它索要它最新的节点信息(这个请求就是RequestENR,Request Eth Node Record,这个请求的具体细节在后面讨论)。如果这些操作都一起正常,这个节点就会被移动到bucket的最前面,否则从bucket中删除这个节点,并从bucket的备用列表中拿出一个节点来替代它。
除了Table类主动维护节点列表外,和别的节点有消息往来时也会将节点更新到列表中。

func (tab *Table) loop() {
   var (
      revalidate     = time.NewTimer(tab.nextRevalidateTime())   // 30秒
      refresh        = time.NewTicker(refreshInterval)         // 30分钟
      refreshDone    = make(chan struct{})
      revalidateDone chan struct{}
      ...
   )
   ...
   // 执行第一次初始化refresh
   go tab.doRefresh(refreshDone)

loop:
   for {
      select {
      case <-refresh.C:
         ...
         go tab.doRefresh(refreshDone)
      case req := <-tab.refreshReq:
         ...
         go tab.doRefresh(refreshDone)
      case <-revalidate.C:
         ...
         go tab.doRevalidate(revalidateDone)
      case <-tab.closeReq:
         break loop
      ...
      }
   }
   ...
   close(tab.closed)
}

func (tab *Table) doRefresh(done chan struct{}) {
   defer close(done)

   tab.loadSeedNodes()
   tab.net.lookupSelf()
   for i := 0; i < 3; i++ {
      tab.net.lookupRandom()
   }
}

func (tab *Table) doRevalidate(done chan<- struct{}) {
   defer func() { done <- struct{}{} }()

   // 随机从一个bucket中读取最后一个节点
   last, bi := tab.nodeToRevalidate()
   if last == nil {
      return
   }
   // ping这个节点
   remoteSeq, err := tab.net.ping(unwrapNode(last))
   ...
   // 如果对方回复的seq比本地存储的大,则向对方索要最新的节点信息
   if last.Seq() < remoteSeq {
      n, err := tab.net.RequestENR(unwrapNode(last))
      ...
      last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks}
   }

   tab.mutex.Lock()
   defer tab.mutex.Unlock()
   b := tab.buckets[bi]
   if err == nil {
      // 一切顺利,把该节点移到bucket最前面
      last.livenessChecks++
      tab.bumpInBucket(b, last)
      return
   }
   // 更新失败,删除该节点,并从bucket的备用列表中取一个节点出来
   // 代替它
   tab.replace(b, last)
   ...
}

至此Table维护节点信息的工作就结束了,总结一下:在初始化的时候先载入种子节点,之后定时对整个列表做refresh操作和对单个节点执行revalidate操作。refresh操作是重新载入种子节点,并在网络中寻找自己和3个随机的节点ID;revalidate操作是测试节点的活性并更新节点信息;当有和别的节点有消息往来时,也会把节点更新到列表中。

-- 逻辑控制器

“逻辑控制器”这个名字是我自己取的,在代码中叫做UDPv4类,代码在p2p/discover/v4_udp.go中。所谓的逻辑控制器就是控制整个UDP模块运转的逻辑,它负责收发往来的UDP通信消息并调用前面的编解码器和Table类执行相应的操作,属于最上层的逻辑控制者。之前埋的两个坑也将在这一节详细介绍:向网络寻找某一个节点ID的的操作是如何实现的?向某一个具体的节点请求它的节点信息(RequestENR请求)是如何实现的?
既然有负责收发消息,那UDPv4就要拥有一个UDP的connection对象,这个connection对象来自于官方net包对一个UDP端口的监听,后续发送和接收消息都通过这个connection对象:

// p2p/server.go
func (srv *Server) setupDiscovery() error {
   ...
   // 用官方net包来监听一个UDP端口,并生成一个connection对象
   conn, err := net.ListenUDP("udp", addr)
   // 用生成的connection对象来初始化UDPv4对象
   ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
   ...
}

// p2p/discover/v4_udp.go
// 这个函数会创建一个UDPv4对象(逻辑控制器),并启动它。在函数结束时返回这个对象
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
   ...
   t := &UDPv4{
      // 用外面传进来的connection对象来初始化UDPv4对象
      conn: c,
      ...
   }

   tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
   t.tab = tab
   // 启动Table类的关键循环
   go tab.loop()

   t.wg.Add(2)
   // 启动UDPv4类的两个关键循环
   go t.loop()
   go t.readLoop(cfg.Unhandled)
   return t, nil
}

讲UDPv4的两个关键循环之前,需要先讲讲它的消息匹配机制:如何将一个回应消息(response)和一个请求消息(request)相配对。UDPv4用一种叫replyMatcher的对象来完成这个操作:UDPv4每发一个请求消息,就生成一个replyMatcher对象,这种对象中包含期望的消息来源的ID、IP和消息类型,以及一个用来判断是否匹配的回调函数;每当有一个回应到来时,UDPv4就会遍历所有replyMatcher对象,判断ID、IP、消息类型是否匹配,并调用回调函数查看其返回值是否为true,如果一切匹配,UDPv4就把这个回应包复制到replyMatcher对象的reply字段里。因为代码中回调函数是以匿名函数的形式传进去的,生成时就带有了每个请求的特定信息和判断逻辑,所以能正确的匹配每一个回应和请求。同时这个函数除了返回是否匹配,还会返回这个请求是否还会有后续的回应(当前回应是否为该请求的最后一个回应),如果是则UDPv4就不再维护这个replyMatcher对象,后续有回应来也不会再去询问这个replyMatcher对象了:

type replyMatcher struct {
   from  enode.ID
   ip    net.IP
   ptype byte.
   callback replyMatchFunc
   errc chan error
   reply v4wire.Packet
   ...
}

接下来看看代码中是如何使用replyMatcher对象的,以ping操作为例,在发送ping请求前先创建一个replyMatcher对象并通过channel让负责匹配的协程收到它。这个replyMatcher对象在创建时被赋值了期望的ID、IP、消息类型,最主要的回调函数是以匿名函数的形式传入的,这个匿名函数中判断了pong包中带有的hash值是否和该ping请求的hash相等,如果相等则返回”true, true”代表“匹配,该消息已完成(后续不会再有这个请求的回应了)”,否则返回”false, false”代表“不匹配,该消息未完成(后续还会有这个请求的回应到来)”。在此之后,才会真正发送ping请求包出去。需要拿取pong回应包的话,就在replyMatcher对象的error channel上等待即可,error channel会输出两种值:nil或具体错误,如果输出的是nil则表明没有错误且reply字段已经准备就位了。
在收到回应包后,UDPv4会遍历所有未完成的replyMatcher对象,如果ID、IP、消息类型和回调函数都匹配,则就把这个回应包复制到replyMatcher对象的reply字段里,并往error channel里塞一个nil,如果回调函数还返回“该消息已完成”,则把这个replyMatcher对象从队列中移除,后续不再询问它了。

/ 为了方便展示,我把这个函数中的调用关系都展开了,原函数不长这样
func (t *UDPv4) ping(n *enode.Node) (seq uint64, err error) {
   // 创建ping数据包
   req := t.makePing(&net.UDPAddr{IP: n.IP(), Port: n.UDP()})
   // 调用编解码器将数据包编码
   packet, hash, err := v4wire.Encode(t.priv, req)
   // 创建replyMatcher对象
   p := &replyMatcher{
      // 包含了期望的ID、IP和消息类型
      from: n.ID(), ip: n.IP(), ptype: v4wire.PongPacket,
      errc: make(chan error, 1),
      // 回调函数判断pong中带有的hash是否和发出去的ping的hash相等
      callback: func(p v4wire.Packet) (matched bool, requestDone bool) {
         matched = bytes.Equal(p.(*v4wire.Pong).ReplyTok, hash)
         return matched, matched
      }
      ...
   }
   // 将replyMatcher对象传递给负责匹配的协程,也就是下面的loop函数
   t.addReplyMatcher <- p
   // 发送实际的ping包,底层是通过官方net包的UDPConn对象
   t.write(&net.UDPAddr{IP: n.IP(), Port: n.UDP()}, n.ID(), req.Name(), packet)
   // 阻塞在replyMatcher对象的error channel上等待pong的到来
   if err = <-p.errc; err == nil {
      pong := p.reply.(*v4wire.Pong)
   }
   ...
}

// 负责匹配的协程的关键函数,一个for循环,不断地匹配消息
func (t *UDPv4) loop() {
   ...
   for {
      select {
      // 将收到的replyMatcher对象放到队列中
      case p := <-t.addReplyMatcher:
         plist.PushBack(p)
      // 收到回应包后遍历队列中所有replyMatcher对象
      case r := <-t.gotreply:
         for el := plist.Front(); el != nil; el = el.Next() {
            p := el.Value.(*replyMatcher)
            // 判断ID、IP、消息类似是否匹配
            if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
               // 调用replyMatcher的回调函数
               if ok, requestDone := p.callback(r.data); ok {
                  // 如果匹配就把回应包数据复制到replyMatcher的reply字段
                  p.reply = r.data
                  // 如果该请求不会有后续回应消息,就把它从队列中移除
                  if requestDone {
                     p.errc <- nil
                     plist.Remove(el)
                  }
               }
            }
         }
      // 将错误塞进到所有replyMatcher对象的error channel中,
      // 让另一端不再等待
      case <-t.closeCtx.Done():
         for el := plist.Front(); el != nil; el = el.Next() {
            el.Value.(*replyMatcher).errc <- errClosed
         }
         return
      ...
      }
   }
}

UDPv4有两个关键循环函数,一个是上面介绍的loop()负责请求和回应的匹配,另一个是readloop()负责从底层UDP连接中不断地读取数据,这两个函数涵盖了UDPv4模块的一生。
readloop()主要就做3件事:(1)从底层UDP连接中读取数据包;(2)如果这个数据包是一个请求则执行相应的操作,并发送回应数据包。如果这个是一个回应数据包,就通过gotreply channel发送给负责匹配的协程(执行loop循环函数的协程);(3)如果这个数据包是无法识别的类型或格式,则传递给上层处理。

func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) {
   ...
   buf := make([]byte, maxPacketSize)
   for {
      // 从底层UDP连接中读取数据包
      nbytes, from, err := t.conn.ReadFromUDP(buf)
      // 在handlePacket中处理这个数据包,
      // 如果这个数据包是一个请求则执行相应的操作,并发送回应数据包。
      // 如果这个是一个回应数据包,就通过gotreply channel发送给负责匹配的协程
      if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil {
         // unhandled是上层在创建UDPv4时传递进来的channel,上层通过这个
         // channel来接收UDPv4无法处理的数据包
         unhandled <- ReadPacket{buf[:nbytes], from}
      }
   }
}

处理消息的关键就是handlePacket()函数,下面详细说说对于每种消息UDPv4都会作何处理。
Ping消息:消息中有当前UDP模块的版本号,发送者和接收者的IP信息,消息过期的绝对时间戳,本地local node的seq值。UDPv4在发送Ping消息后,会阻塞等待Pong消息的到来。UDPv4在收到Ping消息后,会发送一个Pong消息回去。

Ping struct {
   Version    uint
   From, To   Endpoint
   Expiration uint64
   ENRSeq     uint64 `rlp:"optional"` // Sequence number of local record, added by EIP-868.

   // Ignore additional fields (for forward compatibility).
   Rest []rlp.RawValue `rlp:"tail"`
}

Pong消息:消息包含目标节点的IP信息(也就是发送Ping的那一方),与该Pong消息对应的那个Ping消息的hash值,消息过期的绝对时间戳,本地local node的seq值。UDPv4在收到Pong消息后,会判断Pong中的seq值是否自己存储的该节点的seq值大,如果是则会发送ENRRequest来请求最新的节点信息。

Pong struct {
   To         Endpoint
   ReplyTok   []byte // This contains the hash of the ping packet.
   Expiration uint64
   ENRSeq     uint64 `rlp:"optional"` // Sequence number of local record, added by EIP-868.

   // Ignore additional fields (for forward compatibility).
   Rest []rlp.RawValue `rlp:"tail"`
}

ENRRequest消息:消息中只包含了消息过期的绝对时间戳。ENRRequest请求的意思是直接向某个节点要它的节点信息(前面介绍的enode.Node结构体),所以不用包含其他额外的信息了。UDPv4在收到ENRRequest消息后,会直接把自己local node对象转换成Node对象,然后编码发出去。

ENRRequest struct {
   Expiration uint64
   // Ignore additional fields (for forward compatibility).
   Rest []rlp.RawValue `rlp:"tail"`
}

ENRResponse消息:ENRResponse就是ENRRequest的回应,包含了与之对应的ENRRequest消息的hash值,也包含了自己的节点信息档案(Record)。这个档案是从local node里转换出来的。UDPv4在收到ENRResponse消息后,就会把本地存的这个节点的节点信息更新。

ENRResponse struct {
   ReplyTok []byte // Hash of the ENRRequest packet.
   Record   enr.Record
   // Ignore additional fields (for forward compatibility).
   Rest []rlp.RawValue `rlp:"tail"`
}

Findnode消息:消息包含目标节点的公钥,消息过期的绝对时间戳。UDPv4在收到Findnode请求后先会把公钥转换成节点ID,之后在自己维护的“K桶表”(Table类)中找该节点的邻居(具体做法是找到目标节点在自己的Table中所在的bucket,然后把这个bucket里所有的节点都发出去),然后编码发送出去。

Findnode struct {
   Target     Pubkey
   Expiration uint64
   // Ignore additional fields (for forward compatibility).
   Rest []rlp.RawValue `rlp:"tail"`
}

Neighbors消息:消息包含自己知道的目标节点的邻居节点,消息过期的绝对时间戳。

Neighbors struct {
   Nodes      []Node
   Expiration uint64
   // Ignore additional fields (for forward compatibility).
   Rest []rlp.RawValue `rlp:"tail"`
}

之前埋的坑之一:“寻找节点”的操作具体是怎么实现的?当UDP模块开始寻找一个目标节点时,它会维护一个上限为16的节点队列,这个队列里以和目标节点的距离排序,离目标节点越近的节点排在越前面,UDP模块会一直从队列中找没询问过的节点,对它发Findnode请求询问目标节点附近的邻居,然后再用询问回来的结果跟新这个队列,直到这个队列不再更新,就代表已经收集到了离目标节点最近的几个邻居,“寻找节点”结束。完成这套逻辑的是lookup对象,代码在p2p/discover/lookup.go,它的关键函数是run(),run()循环调用advance(),advance()中调用的startQuerises()函数就是向队列中未询问过的节点发送Findnode请求(一次只会向1~3个发送,以防网络负载太重),询问操作是异步的,询问回来的结果会通过replyCh channel传回主协程。
4.jpg

func (it *lookup) run() []*enode.Node {
   for it.advance() {
   }
   return unwrapNodes(it.result.entries)
}

func (it *lookup) advance() bool {
   for it.startQueries() {
      select {
      // 结果通过replyCh channel传回主协程
      case nodes := <-it.replyCh:
         it.replyBuffer = it.replyBuffer[:0]
         for _, n := range nodes {
            if n != nil && !it.seen[n.ID()] {
               it.seen[n.ID()] = true
               it.result.push(n, bucketSize)
               it.replyBuffer = append(it.replyBuffer, n)
            }
         }
         ...
         if len(it.replyBuffer) > 0 {
            return true
         }
      ...
      }
   }
   return false
}

这里再额外讲一个迭代器,loopupIterator,后面会用到。loopupIterator是将lookup对象的run()函数拆分开来了,原本lookup对象是询问一个节点->更新列表->不断反复->直到列表不再更新,loopupIterator是询问一个节点后将结果作为迭代器的缓存并更新列表,当迭代器Next()函数遍历完这个缓存里的节点后,再询问下一个节点,再将下一个节点的结果作为迭代器的缓存并更新列表,直至缓存里的节点都遍历过了且列表不再更新,迭代器结束。

- V5版

V5版也是Kademlia算法的一种实现,所以基本功能和V4版本差不多。Table类和V4版共用一套代码,更新的是编解码器和逻辑控制器。编解码器上不再是简单地将消息转换为RLP(Recursive Length Prefix)编码格式,而是在此基础上将文本加密了。V5版本在启动两个节点之间地交流前,会先让两个节点进行一次握手,交换各自的公钥和随机数,用来生成后续的加密密码。逻辑控制器其实大同小异,原本V4的请求-回应配对是用replyMatcher对象,V5改成了callV5对象,思路基本都差不多。和V4最大的区别是V5不在P2P模块内部使用,V5主要是给外部模块用作UDP传输的底层组件,V5允许外部模块定制通信消息的头部(header),后续自动用这些头部封装和解析消息。值得注意的一点是Geth为了节约底层UDP传输资源,在同时启用V4和V5版本的UDP模块时,V5和V4底层用的是一个net.UCPConn对象,还记得之前介绍V4时说到V4无法解析的消息都会传到一个unhandle的channel中吗,V5就是从这个channel中读取消息的。