您现在的位置是:主页 > 币圈资讯 >

深入理解以太坊P2P网络设计(下)

2021-09-26 18:26币圈资讯 人已围观

简介 (*本文为《深入理解以太坊 P2P 网络设计》的下篇,为了更流畅阅读,请移步主页阅读上篇。) 终止服务 Stop 函数...

(*本文为《深入理解以太坊 P2P 网络设计》的下篇,为了更流畅阅读,请移步主页阅读上篇。)

终止服务

Stop 函数用于终止节点运行,具体代码如下所示:

func (srv *Server) Stop() {    srv.lock.Lock()    if !srv.running {        srv.lock.Unlock()        return   }    srv.running = false    if srv.listener != nil {        // this unblocks listener Accept        srv.listener.Close()   }    close(srv.quit)    srv.lock.Unlock()    srv.loopWG.Wait() }

服务启动

位于 go-ethereum-1.10.2\p2p\server.go中的 start 函数用于启动一个P2P节点:

// filedir:go-ethereum-1.10.2\p2p\server.go L433 func (srv *Server) Start() (err error) {    srv.lock.Lock()    defer srv.lock.Unlock()    if srv.running {        return errors.New("server already running")   }    srv.running = true    srv.log = srv.Config.Logger    if srv.log == nil {        srv.log = log.Root()   }    if srv.clock == nil {        srv.clock = mclock.System{}   }    if srv.NoDial && srv.ListenAddr == "" {        srv.log.Warn("P2P server will be useless, neither dialing nor listening")   }    // static fields    if srv.PrivateKey == nil {        return errors.New("Server.PrivateKey must be set to a non-nil key")   }    if srv.newTransport == nil {        srv.newTransport = newRLPX   }    if srv.listenFunc == nil {        srv.listenFunc = net.Listen   }    srv.quit = make(chan struct{})    srv.delpeer = make(chan peerDrop)    srv.checkpointPostHandshake = make(chan *conn)    srv.checkpointAddPeer = make(chan *conn)    srv.addtrusted = make(chan *enode.Node)    srv.removetrusted = make(chan *enode.Node)    srv.peerOp = make(chan peerOpFunc)    srv.peerOpDone = make(chan struct{})    if err := srv.setupLocalNode(); err != nil {        return err   }    if srv.ListenAddr != "" {        if err := srv.setupListening(); err != nil {            return err       }   }    if err := srv.setupDiscovery(); err != nil {        return err   }    srv.setupDialScheduler()    srv.loopWG.Add(1)    go srv.run()    return nil }

在这里首先检查当前节点是否处于运行状态,如果是则直接返回并给出错误提示信息,如果不是则将 srv.running 设置为 true,之后进入服务启动流程,之后检查 log 是否开启等,之后初始化配置P2P服务信息:

// Start starts running the server. // Servers can not be re-used after stopping. func (srv *Server) Start() (err error) {    srv.lock.Lock()    defer srv.lock.Unlock()    if srv.running {        return errors.New("server already running")   }    srv.running = true    srv.log = srv.Config.Logger    if srv.log == nil {        srv.log = log.Root()   }    if srv.clock == nil {        srv.clock = mclock.System{}   }    if srv.NoDial && srv.ListenAddr == "" {        srv.log.Warn("P2P server will be useless, neither dialing nor listening")   }    // static fields    if srv.PrivateKey == nil {        return errors.New("Server.PrivateKey must be set to a non-nil key")   }    if srv.newTransport == nil {        srv.newTransport = newRLPX   }    if srv.listenFunc == nil {        srv.listenFunc = net.Listen   }    srv.quit = make(chan struct{})    srv.delpeer = make(chan peerDrop)    srv.checkpointPostHandshake = make(chan *conn)    srv.checkpointAddPeer = make(chan *conn)    srv.addtrusted = make(chan *enode.Node)    srv.removetrusted = make(chan *enode.Node)    srv.peerOp = make(chan peerOpFunc)    srv.peerOpDone = make(chan struct{})

之后调用 setupLocalNode 来启动一个本地节点,并建立本地监听,然后配置一个 DiscoveryV5 网络协议,生成节点路由表。

调用 setupDialScheduler 启动主动拨号连接过程,然后开一个协程,在其中做 peer 的维护:

   srv.setupDialScheduler()    srv.loopWG.Add(1)    go srv.run()    return nil }

setupDialScheduler 代码如下所示,这里通过 newDialScheduler 来建立连接,参数 discmix 确定了进行主动建立连接时的节点集,它是一个迭代器 ,同时将 setupConn 连接建立函数传入:

func (srv *Server) setupDialScheduler() {    config := dialConfig{        self:           srv.localnode.ID(),        maxDialPeers:   srv.maxDialedConns(),        maxActiveDials: srv.MaxPendingPeers,        log:            srv.Logger,        netRestrict:    srv.NetRestrict,        dialer:         srv.Dialer,        clock:          srv.clock,   }    if srv.ntab != nil {        config.resolver = srv.ntab   }    if config.dialer == nil {        config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}   }    srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)    for _, n := range srv.StaticNodes {        srv.dialsched.addStatic(n)   } }

newDialScheduler 函数如下所示,在这里通过 d.readNodes(it) 从迭代器中取得节点,之后通过通道传入 d.loop(it) 中进行连接:

// filedir:go-ethereum-1.10.2\p2p\dial.go   L162 func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {    d := &dialScheduler{        dialConfig:  config.withDefaults(),        setupFunc:   setupFunc,        dialing:     make(map[enode.ID]*dialTask),        static:      make(map[enode.ID]*dialTask),        peers:       make(map[enode.ID]connFlag),        doneCh:      make(chan *dialTask),        nodesIn:     make(chan *enode.Node),        addStaticCh: make(chan *enode.Node),        remStaticCh: make(chan *enode.Node),        addPeerCh:   make(chan *conn),        remPeerCh:   make(chan *conn),   }    d.lastStatsLog = d.clock.Now()    d.ctx, d.cancel = context.WithCancel(context.Background())    d.wg.Add(2)    go d.readNodes(it)    go d.loop(it)    return d }

服务监听

在上面的服务启动过程中有一个 setupListening 函数,该函数用于监听事件,具体代码如下所示:

func (srv *Server) setupListening() error {    // Launch the listener.    listener, err := srv.listenFunc("tcp", srv.ListenAddr)    if err != nil {        return err   }    srv.listener = listener    srv.ListenAddr = listener.Addr().String()    // Update the local node record and map the TCP listening port if NAT is configured.    if tcp, ok := listener.Addr().(*net.TCPAddr); ok {        srv.localnode.Set(enr.TCP(tcp.Port))        if !tcp.IP.IsLoopback() && srv.NAT != nil {            srv.loopWG.Add(1)            go func() {                nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")                srv.loopWG.Done()           }()       }   }    srv.loopWG.Add(1)    go srv.listenLoop()    return nil }

在上述代码中又调用了一个 srv.listenLoop(),该函数是一个死循环的 goroutine,它会监听端口并接收外部的请求:

// listenLoop runs in its own goroutine and accepts // inbound connections. func (srv *Server) listenLoop() {    srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())    // The slots channel limits accepts of new connections.    tokens := defaultMaxPendingPeers    if srv.MaxPendingPeers > 0 {        tokens = srv.MaxPendingPeers   }    slots := make(chan struct{}, tokens)    for i := 0; i < tokens; i++ {        slots <- struct{}{}   }    // Wait for slots to be returned on exit. This ensures all connection goroutines    // are down before listenLoop returns.    defer srv.loopWG.Done()    defer func() {        for i := 0; i < cap(slots); i++ {            <-slots       }   }()    for {        // Wait for a free slot before accepting.        <-slots        var (            fd      net.Conn            err     error            lastLog time.Time       )        for {            fd, err = srv.listener.Accept()            if netutil.IsTemporaryError(err) {                if time.Since(lastLog) > 1*time.Second {                    srv.log.Debug("Temporary read error", "err", err)                    lastLog = time.Now()               }                time.Sleep(time.Millisecond * 200)                continue           } else if err != nil {                srv.log.Debug("Read error", "err", err)                slots <- struct{}{}                return           }            break       }        remoteIP := netutil.AddrIP(fd.RemoteAddr())        if err := srv.checkInboundConn(remoteIP); err != nil {            srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)            fd.Close()            slots <- struct{}{}            continue       }        if remoteIP != nil {            var addr *net.TCPAddr            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {                addr = tcp           }            fd = newMeteredConn(fd, true, addr)            srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())       }        go func() {            srv.SetupConn(fd, inboundConn, nil)            slots <- struct{}{}       }()   } }

这里的 SetupConn 主要执行执行握手协议,并尝试把链接创建为一个 peer 对象:

// SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {    c := &conn{fd: fd, flags: flags, cont: make(chan error)}    if dialDest == nil {        c.transport = srv.newTransport(fd, nil)   } else {        c.transport = srv.newTransport(fd, dialDest.Pubkey())   }    err := srv.setupConn(c, flags, dialDest)    if err != nil {        c.close(err)   }    return err }

在上述代码中又去调用了 srv.setupConn(c, flags, dialDest) 函数,该函数用于执行握手协议:

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {    // Prevent leftover pending conns from entering the handshake.    srv.lock.Lock()    running := srv.running    srv.lock.Unlock()    if !running {        return errServerStopped   }    // If dialing, figure out the remote public key.    var dialPubkey *ecdsa.PublicKey    if dialDest != nil {    // dest=nil 被动连接,dest!=nil主动连接诶        dialPubkey = new(ecdsa.PublicKey)        if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {            err = errors.New("dial destination doesn't have a secp256k1 public key")            srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)            return err       }   }    // Run the RLPx handshake.    remotePubkey, err := c.doEncHandshake(srv.PrivateKey)    // 公钥交换,确定共享秘钥RLPx层面的握手一来一去    if err != nil {        srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)        return err   }    if dialDest != nil {        c.node = dialDest   } else {        c.node = nodeFromConn(remotePubkey, c.fd)   }    clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)    err = srv.checkpoint(c, srv.checkpointPostHandshake)    if err != nil {        clog.Trace("Rejected peer", "err", err)        return err   }    // Run the capability negotiation handshake.    phs, err := c.doProtoHandshake(srv.ourHandshake)  // 进行协议层面的握手,也即p2p握手,一来一去    if err != nil {        clog.Trace("Failed p2p handshake", "err", err)        return err   }    if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {        clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))        return DiscUnexpectedIdentity   }    c.caps, c.name = phs.Caps, phs.Name    err = srv.checkpoint(c, srv.checkpointAddPeer)  // 状态校验    if err != nil {        clog.Trace("Rejected peer", "err", err)        return err   }    return nil }

秘钥握手通过 deEncHandshake 函数实现,在函数之中调用了 Handshake() 函数:

// filedir:go-ethereum-1.10.2\p2p\transport.go L123 func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {    t.conn.SetDeadline(time.Now().Add(handshakeTimeout))    return t.conn.Handshake(prv) }

Handshake 代码如下所示,在这里会根据是主动握手还是被动握手来进行执行对应的握手逻辑:

// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go   L253 func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {    var (        sec Secrets        err error   )    if c.dialDest != nil {   //主动握手        sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)  //主动发起秘钥验证握手结束,确定共享秘钥   } else { // 被动握手        sec, err = receiverEncHandshake(c.conn, prv)   }    if err != nil {        return nil, err   }    c.InitWithSecrets(sec)    return sec.remote, err }

主动发起握手过程过程如下,在这里会调用 makeAuthMsg 来生成 Auth 身份信息,包含签名,随机 nonce 生成的与签名对应的公钥和版本号,之后调用 sealEIP8 方法进行 rlpx 编码,之后发起加密握手,之后接收返回的 authResp 消息,并验证解密,获取对方公钥,之后生成 AES,MAC:

// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go L477 func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {    h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}    authMsg, err := h.makeAuthMsg(prv)    if err != nil {        return s, err   }    authPacket, err := sealEIP8(authMsg, h)    if err != nil {        return s, err   }    if _, err = conn.Write(authPacket); err != nil {        return s, err   }    authRespMsg := new(authRespV4)    authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)    if err != nil {        return s, err   }    if err := h.handleAuthResp(authRespMsg); err != nil {        return s, err   }    return h.secrets(authPacket, authRespPacket) }

receiverEncHandshake 如下所示,和 initiatorEncHandshake 相差无几:

func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {    authMsg := new(authMsgV4)    authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)    if err != nil {        return s, err   }    h := new(encHandshake)    if err := h.handleAuthMsg(authMsg, prv); err != nil {        return s, err   }    authRespMsg, err := h.makeAuthResp()    if err != nil {        return s, err   }    var authRespPacket []byte    if authMsg.gotPlain {        authRespPacket, err = authRespMsg.sealPlain(h)   } else {        authRespPacket, err = sealEIP8(authRespMsg, h)   }    if err != nil {        return s, err   }    if _, err = conn.Write(authRespPacket); err != nil {        return s, err   }    return h.secrets(authPacket, authRespPacket) }

之后通过doProtoHandshake来完成协议握手操作,在这里调用send发送一次握手操作,之后通过readProtocolHandshake来读取返回信息,之后进行检查:

func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {

   werr := make(chan error, 1)    go func() { werr <- Send(t, handshakeMsg, our) }()       if their, err = readProtocolHandshake(t); err != nil {        <-werr // make sure the write terminates too        return nil, err   }    if err := <-werr; err != nil {        return nil, fmt.Errorf("write error: %v", err)   }    // If the protocol version supports Snappy encoding, upgrade immediately    t.conn.SetSnappy(their.Version >= snappyProtocolVersion)    return their, nil }

服务循环

run函数是服务的主循环,监听服务器终止、增加信任节点、移除信任节点、增加检查节点等:

func (srv *Server) run() {    srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())    defer srv.loopWG.Done()    defer srv.nodedb.Close()    defer srv.discmix.Close()    defer srv.dialsched.stop()    var (        peers        = make(map[enode.ID]*Peer)        inboundCount = 0        trusted      = make(map[enode.ID]bool, len(srv.TrustedNodes))   )

   for _, n := range srv.TrustedNodes {        trusted[n.ID()] = true   } running:    for {        select {        case <-srv.quit:

           break running        case n := <-srv.addtrusted:

           srv.log.Trace("Adding trusted node", "node", n)            trusted[n.ID()] = true            if p, ok := peers[n.ID()]; ok {                p.rw.set(trustedConn, true)           }        case n := <-srv.removetrusted:

           srv.log.Trace("Removing trusted node", "node", n)            delete(trusted, n.ID())            if p, ok := peers[n.ID()]; ok {                p.rw.set(trustedConn, false)           }        case op := <-srv.peerOp:

           op(peers)            srv.peerOpDone <- struct{}{}        case c := <-srv.checkpointPostHandshake:

           if trusted[c.node.ID()] {

               c.flags |= trustedConn           }

           c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)        case c := <-srv.checkpointAddPeer:

           err := srv.addPeerChecks(peers, inboundCount, c)            if err == nil {

               p := srv.launchPeer(c)                peers[c.node.ID()] = p                srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())                srv.dialsched.peerAdded(c)                if p.Inbound() {                    inboundCount++               }           }            c.cont <- err        case pd := <-srv.delpeer:            // A peer disconnected.            d := common.PrettyDuration(mclock.Now() - pd.created)            delete(peers, pd.ID())            srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)            srv.dialsched.peerRemoved(pd.rw)            if pd.Inbound() {                inboundCount--           }       }   }    srv.log.Trace("P2P networking is spinning down")

   if srv.ntab != nil {        srv.ntab.Close()   }    if srv.DiscV5 != nil {        srv.DiscV5.Close()   }

   for _, p := range peers {        p.Disconnect(DiscQuitting)   }

   for len(peers) > 0 {        p := <-srv.delpeer        p.log.Trace("<-delpeer (spindown)")        delete(peers, p.ID())   } }

节点信息

NodeInfo用于查看节点信息,PeersInfo用于查看连接的节点信息:

// NodeInfo gathers and returns a collection of metadata known about the host. func (srv *Server) NodeInfo() *NodeInfo {

   node := srv.Self()    info := &NodeInfo{        Name:       srv.Name,        Enode:      node.URLv4(),        ID:         node.ID().String(),        IP:         node.IP().String(),        ListenAddr: srv.ListenAddr,        Protocols:  make(map[string]interface{}),   }    info.Ports.Discovery = node.UDP()    info.Ports.Listener = node.TCP()    info.ENR = node.String()

   for _, proto := range srv.Protocols {        if _, ok := info.Protocols[proto.Name]; !ok {            nodeInfo := interface{}("unknown")            if query := proto.NodeInfo; query != nil {                nodeInfo = proto.NodeInfo()           }            info.Protocols[proto.Name] = nodeInfo       }   }    return info }

func (srv *Server) PeersInfo() []*PeerInfo {    // Gather all the generic and sub-protocol specific infos    infos := make([]*PeerInfo, 0, srv.PeerCount())    for _, peer := range srv.Peers() {        if peer != nil {            infos = append(infos, peer.Info())       }   }    // Sort the result array alphabetically by node identifier    for i := 0; i < len(infos); i++ {        for j := i + 1; j < len(infos); j++ {            if infos[i].ID > infos[j].ID {                infos[i], infos[j] = infos[j], infos[i]           }       }   }    return infos }

请求处理

下面为peer.run函数的代码:

func (p *Peer) run() (remoteRequested bool, err error) {    var (        writeStart = make(chan struct{}, 1)        writeErr   = make(chan error, 1)        readErr    = make(chan error, 1)        reason     DiscReason // sent to the peer   )    p.wg.Add(2)    go p.readLoop(readErr)    go p.pingLoop()    // Start all protocol handlers.    writeStart <- struct{}{}    p.startProtocols(writeStart, writeErr)    // Wait for an error or disconnect. loop:    for {        select {        case err = <-writeErr:            // A write finished. Allow the next write to start if            // there was no error.            if err != nil {                reason = DiscNetworkError                break loop           }            writeStart <- struct{}{}        case err = <-readErr:            if r, ok := err.(DiscReason); ok {                remoteRequested = true                reason = r           } else {                reason = DiscNetworkError           }            break loop        case err = <-p.protoErr:            reason = discReasonForError(err)            break loop        case err = <-p.disc:            reason = discReasonForError(err)            break loop       }   }    close(p.closed)    p.rw.close(reason)    p.wg.Wait()    return remoteRequested, err }

从上述代码中可以看到函数的开头首先定义了一些局部变量,之后启用了两个协程,一个是readLoop,它通过调用ReadMsg()读取msg,之后又通过调用peer.handle(msg)来处理msg

如果msg是pingMsg,则发送一个pong回应,如果msg与下述特殊情况不相匹配则将msg交给proto.in通道,等待protocolManager.handleMsg()从通道中取出。另一个协程是pingLoop,它主要通过调用SendItems(p.rw, pingMsg)来发起ping请求

之后调用starProtocols()函数让协议运行起来:

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {    p.wg.Add(len(p.running))    for _, proto := range p.running {        proto := proto        proto.closed = p.closed        proto.wstart = writeStart        proto.werr = writeErr        var rw MsgReadWriter = proto        if p.events != nil {            rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress)       }        p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))        go func() {            defer p.wg.Done()            err := proto.Run(p, rw)            if err == nil {                p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))                err = errProtocolReturned           } else if err != io.EOF {                p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)           }            p.protoErr <- err       }()   } }

最后通过一个loop循环来处理错误或者断开连接等操作:

   // Wait for an error or disconnect. loop:    for {        select {        case err = <-writeErr:            // A write finished. Allow the next write to start if            // there was no error.            if err != nil {                reason = DiscNetworkError                break loop           }            writeStart <- struct{}{}        case err = <-readErr:            if r, ok := err.(DiscReason); ok {                remoteRequested = true                reason = r           } else {                reason = DiscNetworkError           }            break loop        case err = <-p.protoErr:            reason = discReasonForError(err)            break loop        case err = <-p.disc:            reason = discReasonForError(err)            break loop       }   }    close(p.closed)    p.rw.close(reason)    p.wg.Wait()    return remoteRequested, err

创数据库

newPersistentDB函数用于创建一个持久化的数据库用于存储节点信息:

// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L95 // newPersistentNodeDB creates/opens a leveldb backed persistent node database, // also flushing its contents in case of a version mismatch. func newPersistentDB(path string) (*DB, error) {    opts := &opt.Options{OpenFilesCacheCapacity: 5}    db, err := leveldb.OpenFile(path, opts)    if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {        db, err = leveldb.RecoverFile(path, nil)   }    if err != nil {        return nil, err   }

   currentVer := make([]byte, binary.MaxVarintLen64)    currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]    blob, err := db.Get([]byte(dbVersionKey), nil)    switch err {    case leveldb.ErrNotFound:        // Version not found (i.e. empty cache), insert it        if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {            db.Close()            return nil, err       }    case nil:        // Version present, flush if different        if !bytes.Equal(blob, currentVer) {            db.Close()            if err = os.RemoveAll(path); err != nil {                return nil, err           }            return newPersistentDB(path)       }   }    return &DB{lvl: db, quit: make(chan struct{})}, nil }

节点超时

ensureExpirer函数用于检查节点是否超时,具体实现代码如下所示:

func (db *DB) ensureExpirer() {    db.runner.Do(func() { go db.expirer() }) }

func (db *DB) expirer() {    tick := time.NewTicker(dbCleanupCycle)    defer tick.Stop()    for {        select {        case <-tick.C:            db.expireNodes()        case <-db.quit:            return       }   } }

func (db *DB) expireNodes() {    it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)    defer it.Release()    if !it.Next() {        return   }    var (        threshold    = time.Now().Add(-dbNodeExpiration).Unix()        youngestPong int64        atEnd        = false   )    for !atEnd {        id, ip, field := splitNodeItemKey(it.Key())        if field == dbNodePong {            time, _ := binary.Varint(it.Value())            if time > youngestPong {                youngestPong = time           }            if time < threshold {                // Last pong from this IP older than threshold, remove fields belonging to it.                deleteRange(db.lvl, nodeItemKey(id, ip, ""))           }       }        atEnd = !it.Next()        nextID, _ := splitNodeKey(it.Key())        if atEnd || nextID != id {

           if youngestPong > 0 && youngestPong < threshold {                deleteRange(db.lvl, nodeKey(id))           }            youngestPong = 0       }   } }

状态更新

下面是一些状态更新函数:

// LastPingReceived retrieves the time of the last ping packet received from // a remote node. func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {    if ip = ip.To16(); ip == nil {        return time.Time{}   }    return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0) } // UpdateLastPingReceived updates the last time we tried contacting a remote node. func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {    if ip = ip.To16(); ip == nil {        return errInvalidIP   }    return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix()) } // LastPongReceived retrieves the time of the last successful pong from remote node. func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {    if ip = ip.To16(); ip == nil {        return time.Time{}   }    // Launch expirer    db.ensureExpirer()    return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0) } // UpdateLastPongReceived updates the last pong time of a node. func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {    if ip = ip.To16(); ip == nil {        return errInvalidIP   }    return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix()) } // FindFails retrieves the number of findnode failures since bonding. func (db *DB) FindFails(id ID, ip net.IP) int {    if ip = ip.To16(); ip == nil {        return 0   }    return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails))) } // UpdateFindFails updates the number of findnode failures since bonding. func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {    if ip = ip.To16(); ip == nil {        return errInvalidIP   }    return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails)) } // FindFailsV5 retrieves the discv5 findnode failure counter. func (db *DB) FindFailsV5(id ID, ip net.IP) int {    if ip = ip.To16(); ip == nil {        return 0   }    return int(db.fetchInt64(v5Key(id, ip, dbNodeFindFails))) } // UpdateFindFailsV5 stores the discv5 findnode failure counter. func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {    if ip = ip.To16(); ip == nil {        return errInvalidIP   }    return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails)) }

节点挑选

QuerySeeds函数用于从数据库里面随机挑选合适种子节点:

// QuerySeeds retrieves random nodes to be used as potential seed nodes // for bootstrapping. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {    var (        now   = time.Now()        nodes = make([]*Node, 0, n)        it    = db.lvl.NewIterator(nil, nil)        id    ID   )    defer it.Release() seek:    for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {

       ctr := id[0]        rand.Read(id[:])        id[0] = ctr + id[0]        it.Seek(nodeKey(id))        n := nextNode(it)        if n == nil {            id[0] = 0            continue seek // iterator exhausted       }        if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {            continue seek       }        for i := range nodes {            if nodes[i].ID() == n.ID() {                continue seek // duplicate           }       }        nodes = append(nodes, n)   }    return nodes }

func nextNode(it iterator.Iterator) *Node {    for end := false; !end; end = !it.Next() {        id, rest := splitNodeKey(it.Key())        if string(rest) != dbDiscoverRoot {            continue       }        return mustDecodeNode(id[:], it.Value())   }    return nil }

总结

P2P网络是区块链分布式网络结构的基础,本篇文章详细介绍了P2P网络的基本原理,包括节点发现机制、分布式哈希表、节点查找、节点新增、节点移除、请求处理等,同时从源码角度对以太坊源码中P2P网络的实现做了较为细致的分析,探索了以太坊P2P网络的工作流程以以及安全设计,而公链安全体系的建设依旧是长路漫漫,有待进一步深入探索。

Tags:

标签云

站点信息