grpc和protobuf

什么是rpc

RPC(Remote Procedure Call),翻译成中文叫远程过程调用。其设计思路是程序A可以像支持本地函数一样调用远程程序B的一个函数。程序A和程序B很可能不在一台机器上,这中间就需要网络通讯,一般都是基于tcp或者http协议。函数有入参和返回值这里就需要约定一致的序列化方式,比较常见的有binary,json,xml。函数的调用过程可以同步和异步。随着分布式程序近几年的大行其道,rpc作为其主要的通讯方式也越来越进入我们的视野,比如grpc,thrift。

go标准库的rpc

golang的标准库就支持rpc。

server端

package main import ( "errors" "log" "net" "net/rpc" ) type Param struct { A,B int } type Server struct {} func (t *Server) Multiply(args *Param, reply *int) error { *reply = args.A * args.B return nil } func (t *Server) Divide(args *Param, reply *int) error { if args.B == 0 { return errors.New("divide by zero") } *reply = args.A / args.B return nil } func main() { rpc.RegisterName("test", new(Server)) listener, err := net.Listen("tcp", ":9700") if err != nil { log.Fatal("ListenTCP error:", err) } for { conn, err := listener.Accept() if err != nil { log.Fatal("Accept error:", err) } go rpc.ServeConn(conn) } }

client端

package main import ( "fmt" "log" "net/rwida/micro-server "time"wida/micro-server ) type Param struct { A,B int } func main() { client, err := rpc.Dial("tcp", "localhost:9700") if err != nil { log.Fatal("dialing:", err) } //同步调用 var reply int err = client.Call("test.Multiply", Param{34,35}, &reply) if err != nil { log.Fatal(err) } fmt.Println(reply) //异步调用 done := make(chan *rpc.Call, 1) client.Go("test.Divide", Param{34,17}, &reply,done) select { case d := <-done: fmt.Println(* d.Reply.(*int)) case <-time.After(3e9): fmt.Println("time out") } }
$ go run server/main.go & $ go run client/main.go 1190 2

程序中我们没有看到参数的序列化和反序列化过程,实际上go标准库rpc使用encoding/gob做了序列化和反序列化操作,但是encoding/gob只支持go语言内部做交互,如果需要夸语言的话就不能用encoding/gob了。我们还可以使用标准库中的jsonrpc.

server

package main import ( "errors" "log" "net" "net/rpc" "net/rpc/jsonrpc" ) type Param struct { A,B int } type Server struct {} func (t *Server) Multiply(args *Param, reply *int) error { *reply = args.A * args.B return nil } func (t *Server) Divide(args *Param, reply *int) error { if args.B == 0 { return errors.New("divide by zero") } *reply = args.A / args.B return nil } func main() { rpc.RegisterName("test", new(Server)) listener, err := net.Listen("tcp", ":9700") if err != nil { log.Fatal("ListenTCP error:", err) } for { conn, err := listener.Accept() if err != nil { log.Fatal("Accept error:", err) } go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
$ go run server/main.go & $ echo -e '{"method":"test.Multiply","params":[{"A":34,"B":35}],"id":0}' | nc localhost 9700 {"id":0,"result":1190,"error":null} $ echo -e '{"method":"test.Divide","params":[{"A":34,"B":17}],"id":1}' | nc localhost 9700 {"id":0,"result":1190,"error":null}

这个例子中我们用go写了服务端,客户端我们使用linux 命令行工具nc(natcat可能需要单独安装)直接和服务端交互。可以看到交互的序列化方式是json,因此其它语言也很容易实现和该服务端的交互。

当然我们上面演示的是基于tcp的rpc方式,标准库同时也支持http协议的rpc,感兴趣的同学可以去了解下,这边就不做展开介绍。

GRPC

上面我们发了一些篇幅介绍了标准库的rpc,主要的目的是了解rpc是什么。实际的开发中我们反而比较少用标准库的rpc,我们通常会选择grpc,grpc不仅仅在go生态里头非常流行,在其他语言生态里头同样也非常流行。

grpc是google公司开发的基于http2协议设计和protobuf开发的,高性能,跨语言的rpc框架。这边着重介绍下http2的一个重要特性当tcp多路复用功能,说得直白点就是一个在tcp链接执行多个请求,所以Service A提供N多个服务,Client B和A的所有交互都只用一个链接,这样子可以省很多的链接资源。对tcp连接复用( connection multiplexing)感兴趣的同学可以阅读下yamux的源码。

protobuf

protobuf是google开发的一种平台中立,语言中立,可拓展的数据描述语言。类似json,xml等数据描述语言类似,proto也实现了自己的序列化和反序列化方式,对相对于json来说,protobuf序列化效率更高,体积更小。官方地址protobuf有兴趣的同学可以看看。

protobuf安装

我们可以从protobuf/releases里头下载到相应的二进制版本安装protobuf。比如

protoc-3.10.0-win32.zip protoc-3.10.0-linux-x86_64.zip

解压文件到文件夹,然后将该文件目录添加到环境变量PATH中。

$ protoc --version libprotoc 3.10.0

ok protobuf就安装成功了。

安装protobuf go插件

go get -u -v github.com/golang/protobuf/proto go get -u -v github.com/golang/protobuf/protoc-gen-go

hello world

新建一个go项目

├── client │   └── main.go ├── pb │   ├── gen.sh │   └── search.proto └── server └── main.go

protobuf 消息定义文件为 pb/search.proto

syntax = "proto3"; package pb; service Searcher { rpc Search (SearchRequest) returns (SearchReply) {} } message SearchRequest { bool name = 1; } message SearchReply { string name = 1; }

这个文件我定义了rpc的服务Searcher,里头定了一个接口Search,同时定义了入参和返回值类型。

我们写一个shell脚本gen.sh来生成go程序文件

#!/bin/bash PROTOC=`which protoc` $PROTOC --go_out=plugins=grpc:. search.proto

cd 到pb文件夹执行 gen.sh 脚本

$ ./gen.sh $ cd ../ && tree ├── client │   └── main.go ├── pb │   ├── gen.sh │   ├── search.pb.go │   └── search.proto └── server └── main.go

可以看到我们多了一个 search.pb.go的文件。 我们在server文件下写我们服务main.go

package main import ( "log" "net" "context" "github.com/widaT/gorpc_demo/grpc/pb" "google.golang.org/grpc" ) const ( port = ":50051" ) type server struct{} func (s *server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchReply, error) { return &pb.SearchReply{Name:"hello " + in.GetName()}, nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterSearcherServer(s, &server{}) s.Serve(lis) }

我们在client文件下写我们的客户端main.go

package main import ( "context" "fmt" "github.com/widaT/gorpc_demo/grpc/pb" "google.golang.org/grpc" "log" "time" ) const ( address = "localhost:50051" ) func main() { // Set up a connection to the server. conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewSearcherClient(conn) s := time.Now() r, err := c.Search(context.Background(), &pb.SearchRequest{Name: "world"}) if err != nil { log.Fatalf("could not greet: %v", err) } fmt.Println(r , time.Now().Sub(s)) }
$ go run server/main.go & $ go run client/main.go & name:"hello world" 14.767013ms

grpc stream

上面的hello world程序演示了grpc的一般用法,这种方式能满足大部分的场景。grpc还提供了双向或者单向流的方式我们成为grpc stream,stream的方式一般用在有大量的数据交互或者长时间交互。

我们修改的grpc服务定义文件pb/search.proto,在service增加 rpc Search2 (stream SearchRequest) returns (stream SearchReply) {}

syntax = "proto3"; package pb; service Searcher { rpc Search (SearchRequest) returns (SearchReply) {} rpc Search2 (stream SearchRequest) returns (stream SearchReply) {} } message SearchRequest { string name = 1; } message SearchReply { string name = 1; }

服务端我们修改代码,添加一个func (s *server) Search2(pb.Searcher_Search2Server) error的一个实现方法。

package main import ( "io" "log" "net" "context" "github.com/widaT/gorpc_demo/grpc/pb" "google.golang.org/grpc" ) const ( port = ":50051" ) type server struct{} func (s *server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchReply, error) { return &pb.SearchReply{Name:"hello " + in.GetName()}, nil } func (s *server) Search2(stream pb.Searcher_Search2Server) error { for { args, err := stream.Recv() if err != nil { if err == io.EOF { return nil } return err } reply := &pb.SearchReply{ Name:"hello:" + args.GetName()} err = stream.Send(reply) if err != nil { return err } } return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterSearcherServer(s, &server{}) s.Serve(lis) }

为了方便阅读代码,client代码我们重写

package main import ( "context" "fmt" "github.com/widaT/gorpc_demo/grpc/pb" "google.golang.org/grpc" "io" "log" "time" ) const ( address = "localhost:50051" ) func main() { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewSearcherClient(conn) stream, err := c.Search2(context.Background()) if err != nil { log.Fatal(err) } go func() { //这边启动一个goroutine 发送请求 for { if err := stream.Send(&pb.SearchRequest{Name: "world"}); err != nil { log.Fatal(err) } time.Sleep(time.Second) } }() for { //主goroutine 一直接收结果 rep, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(rep.GetName()) } }

运行代码

$ go run server/main.go & $ go run client/main.go hello:world hello:world hello:world ...

总结

本文只是对golang使用grpc做了简要的介绍,grpc的使用范围很广,需要我们持续了解和学习。 下面推荐个资料

参考文档