以太坊P2P唐佳良 模块使用示例-DivingInto以太坊

P2P综天仙路 模块(以下称devp2p)在以太坊中用于节点发现、交易,区块金喻良缘 ,帐户等数据同步。波力斯卡 从本篇开始,我们开始介绍 geth 客户端 devp2p 的使用方法及它的原理和源码分析。本系列文章大致分成3个部分,首先从一个简单的示例了解 devp2p 向上层提供的接口,然后讨论 p2p 节点发现协议,包括路由表的建立过程和握手协议。最后我们再分析底层的数据传输协议。示例
devp2p 除了应用于以太坊,其本身也可单独使用,我们先拓展以太坊wiki上的示例,它的功能是向与它连接的节点不停的发送随机数吐司网,接收到随机数的节点将它打印出来。
package main
import (
"fmt"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"gopkg.in/urfave/cli.v1"
"log"
"math/rand"
"os"
"sync"
"time"
)
var (
portint
bootnode string
)
const (
msgCodeRandNum = 0
msgLength= iota
)
func main() {
app := cli.NewApp()
app.Usage = "p2p package demo"
app.Action = startP2pNode
app.Flags = []cli.Flag{
cli.IntFlag{Name: "port", Value: 11200, Usage: "listen port", Destination: &port},
cli.StringFlag{Name: "bootnode", Value: "", Usage: "boot node", Destination: &bootnode},
}
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
func startP2pNode(c *cli.Context) error {
emitter := NewEmitter()
nodeKey, _ := crypto.GenerateKey()
node := p2p.Server{
Config: p2p.Config{
MaxPeers: 100依灵修仙记 ,
PrivateKey: nodeKey,
Name: "p2pDemo",
ListenAddr: fmt.Sprintf(":%d"千牛帮 , port)李恩熙,
Protocols: []p2p.Protocol{emitter.MyProtocol()},
},
}
bootNode, err := discover.ParseNode(bootnode)
if err != nil {
return err
}
node.Config.BootstrapNodes = []*discover.Node{bootNode}
if err := node.Start(); err != nil {
return err
}
emitter.self = node.NodeInfo().ID[:8]
go emitter.loopSendMsg()
select {}
return nil
}
func (e *Emitter) MyProtocol() p2p.Protocol {
return p2p.Protocol{
Name: "rad",
Version: 1,
Length: msgLength,
Run:e.msgHandler,
}
}
type peer struct {
peer *p2p.Peer
ws p2p.MsgReadWriter
}
type Emitter struct {
self string
peers map[string]*peer
sync.Mutex
}
func NewEmitter() *Emitter {
return &Emitter{peers: make(map[string]*peer)}
}
func (e *Emitter) addPeer(p *p2p.Peer, ws p2p.MsgReadWriter) {
e.Lock()
defer e.Unlock()
id := fmt.Sprintf("%x", p.ID().String()[:8])
e.peers[id] = &peer{ws: ws, peer: p}
}
func (e *Emitter) loopSendMsg() {
for {
func() {
e.Lock()
defer e.Unlock()
msg := fmt.Sprintf("%s_%d", e.self, rand.Int())
for _, p := range e.peers {
if err := p2p.SendItems(p.ws, msgCodeRandNum, msg); err != nil {
log.Println("Emitter.loopSendMsg p2p.SendItems err", err, "peer id", p.peer.ID())
continue
}
}
}()
time.Sleep(time.Second * 3)
}
}
func (e *Emitter) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
e.addPeer(peer, ws)
for {
msg, err := ws.ReadMsg()
if err != nil {
return err
}
switch msg.Code {
case msgCodeRandNum:
var myMessage []string
if err := msg.Decode(&myMessage); err != nil {
log.Println("decode msg err", err)
} else {
log.Println("read msg:", myMessage[0])
}
default:
log.Println("unkown msg code")
}
}
return nil
}运行
假设编译出来的二进制文件叫 p2pdemo,首先我们启动以太坊的 bootnode 程序(参考:搭建本地以太坊网络),获取bootnode的地址,假设是:
enode://6a6b8dbab94c9479462f7dfb8c16a3c78e3f4fca8ee648a90a6ff556e08380201c8a7109329c388af57ff707bab60f26a6fa0de91a93f0878ec31da50aa5e7bf@127.0.0.1:30301
然后另开两个终端,分别输入:
# 终端1输入
$./p2pdemo --port 3401 --bootnode enode://6a6b8dbab94c9479462f7dfb8c16a3c78e3f4fca8ee648a90a6ff556e08380201c8a7109329c388af57ff707bab60f26a6fa0de91a93f0878ec31da50aa5e7bf@127.0.0.1:30301
# 终端2输入
$./p2pdemo --port 3402 --bootnode enode://6a6b8dbab94c9479462f7dfb8c16a3c78e3f4fca8ee648a90a6ff556e08380201c8a7109329c388af57ff707bab60f26a6fa0de91a93f0878ec31da50aa5e7bf@127.0.0.1:30301
没有问题的话,两个节点都能应该能接收到对方发送的随机数。源码分析
下面我们来分析上面的示例代码。
启动节点时,需要指定其 PriviateKey ,它是一个椭圆非对称密钥,节点之间建立握手协议时会使用它,同时 PublicKey 也作为节点的ID,起到唯一标识的作用。节点支持的协议由 Protocol 定义,需要在配置字段中指定,它的主要定义如下:
// Protocol represents a P2P subprotocol implementation.
type Protocol struct {
// Name should contain the official protocol name,
// often a three-letter word.
Name string
// Version should contain the version number of the protocol.
Version uint
// Length should contain the number of message codes used
// by the protocol.
Length uint64
// Run is called in a new groutine when the protocol has been
// negotiated with a peer. It should read and write messages from
// rw. The Payload for each message must be fully consumed.
//
// The peer connection is closed when Start returns. It should return
// any protocol-level error (such as an I/O error) that is
// encountered.
Run func(peer *Peer南沙海战 , rw MsgReadWriter) error
}
Name,协议名称建议是三个小写的字母,以太坊中使用的是eth。
Version,版本号是整数,旧版本号需要小于新的版本号。
Length严家满 ,其值等于本协议定义的最大的msgCode+1。理解这个字段之前,先看下消息的定义:
type Msg struct {
Code uint64
Size uint32 // size of the paylod
Payload io.Reader
ReceivedAt time.Time
}
其中的Code定义了消息种类,用来区分不同的消息,应用层需要根据它从Payload中反序列化解析出相应的消息。相同名称版本不同的协议,旧版本的最大MessageCode不要超过新版本的最大值。Length同理。对于不同名称的协议和亲公主鲜橙 ,Code定义互不影响,可以相同,下文会分析低层实现。
反序列化使用的RLP,可以参考:以太坊源码学习—RLP编码
以太坊中协议的定义相关源码如下:
// Constants to match up protocol versions and messages
const (
eth62 = 62
eth63 = 63
)
// Official short name of the protocol used during capability negotiation.
var ProtocolName = "eth"
// Supported versions of the eth protocol (first is primary).
var ProtocolVersions = []uint{eth63关芝林 , eth62}
// Number of implemented message corresponding to different protocol versions.
var ProtocolLengths = []uint64{17, 8}
const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
// eth protocol message codes
const (
// Protocol messages belonging to eth/62
StatusMsg = 0x00
NewBlockHashesMsg = 0x01
TxMsg = 0x02
GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05
BlockBodiesMsg= 0x06
NewBlockMsg = 0x07
// Protocol messages belonging to eth/63
GetNodeDataMsg = 0x0d
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
)
在eth62版本中,最大 code 是0x07,在eth63最大是0x10,所以对应的有varProtocolLengths=[]uint64{17,8}。
Run是协议的处理函数。该函数在链接建立的时候会调用一次,后续通过循环从MsgReadWriter中读取 Msg李依芮 。一般此函数的实现框架如下:
func Run(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
addPeerCache(peer, ws)
for {
msg, err := ws.ReadMsg()
if err != nil {
return err
}
switch msg.Code {
case MessageCode1:
var myMessage []string
if err := msg.Decode(&myMessage); err != nil {
log.Println("decode msg err", err)
} else {
// TODO
}
default:
log.Println("unkown msg code")
}
}
return nil
}
对于不同名称的协议, msgCode 是可以相同的,这是底层通过对 msgCode 作一次线性映射做到的。首先对所有 Protocol 按名称和版本号进行排序,不同名称按字符串排序,相同名称不同版本再按版本号由小到大排序。每个 Protocol 都有一个偏移量 offset 稀世奇缘 ,它与Length的和作为下一个Protocol的偏移量徐成峰 。假设排序后的结果是p[1],p[2],p[3],...,p[n],p[n+1],...,则有:
p[n+1].offset = p[n].offset + p[n].Length
p[0].offset = baseProtocolLength
小于起始偏移量 baseProtocolLength 的 msgCode 是系统消息。发送消息时,小威向前冲 msgCode 会自动加上 offset,读取消息时则减去 offset 即可。以下是对应源码:
type protoRW struct {
Protocol
inchan Msg // receices read messages
closed <-chan struct{} // receives when peer is shutting down
wstart <-chan struct{} // receives when write may start
werr chan<- error // for write results
offset uint64
wMsgWriter
}
// 对message code作区间映射,这样不同协议允许定义相同的message code
func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
sort.Sort(capsByNameAndVersion(caps))
offset := baseProtocolLength
result := make(map[string]*protoRW)
outer:
for _, cap := range caps {
for _, proto := range protocols {
if proto.Name == cap.Name && proto.Version == cap.Version {
// If an old protocol version matched个旧吧 , revert it
if old := result[cap.Name]; old != nil {
offset -= old.Length
}
// Assign the new match
result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
offset += proto.Length
continue outer
}
}
}
return result
}
对于名称相同,版本不同的协议,您可以阅读上面的代码自行体会理解。
devp2p 提供的上层应用接口非常简单,我们可以使用它构建自己的 p2p 应用。下一篇我会介绍 p2p 节点发现协议刘銮鸿 ,欢迎订阅本公众号。
参考
https://github.com/ethereum/go-ethereum/wiki/Peer-to-Peer