ふわっとテック日記

テック系のことをメインに書いていきます。go言語/typescript/javascriptが好きです。たまに趣味(筋トレ)の話や日常系の記事も書きたいな〜と思っています。

gRPC触ってみた(3)

前回記事: gRPC触ってみた(2)

前前回記事: gRPC触ってみた

grpcのストリーム通信について

今回の記事では、grpcのストリーム通信を扱います。

grpcでは1リクエスト1レスポンス方式のUnary通信に加え、ストリーム通信を実装することができます。

ストリーム通信は以下の3通りあります。

サーバストリーミング

1リクエストに対して複数のレスポンスが返る通信形式です。

クライアントストリーミング

複数リクエストに対して1レスポンスが返る通信形式です。

双方向ストリーミング

複数リクエストに対して複数レスポンスが返る通信形式です。

サーバストリーミング

Protoファイルは以下のような実装となります。

message GetListRequest {
}
message GetListResponse {
    string name = 1;
}

service ListService {
    rpc GetList (GetListRequest) returns (stream GetListResponse);
}

サービス定義の返り値指定の箇所にstreamを加えているのがわかります。これにより対象のRPCメソッドがサーバストリーミング形式となります。

Protocコマンドでgoファイルを実装すると、サーバ用の以下のようなコードが生成されます。

func (UnimplementedListServiceServer) GetList(*GetListRequest, ListService_GetListServer) error {
    return status.Errorf(codes.Unimplemented, "method GetList not implemented")
}

RPCメソッドの引数にListService_GetListServerインターフェースを指定するようになっていますが、このインターフェースはSendメソッドを内包しており、このメソッドでレスポンスを返せるようになっています。

type ListService_GetListServer interface {
    Send(*GetListResponse) error
    grpc.ServerStream
}
...
func (x *listServiceGetListServer) Send(m *GetListResponse) error {
    return x.ServerStream.SendMsg(m)
}

サーバ側での実装例としては、下記のようになります。

func (*Server) GetList(req *pb.GetListRequest, stream pb.ListService_GetListServer) error {
    for i := 0; i < 5; i++ {
        err := stream.Send(&pb.GetListResponse{
            Name: fmt.Sprintf("This is list %d", i),
        })
        if err != nil {
            return err
        }
        log.Println("stream sent:", i)
        time.Sleep(1 * time.Second)
    }
    return nil
}

この例では5回forループを回し、毎ループでレスポンスを返しています。

最後にnilをreturnして通信をcloseさせています。

一方、クライアント側の生成コードは以下のようになります。

func (c *listServiceClient) GetList(ctx context.Context, in *GetListRequest, opts ...grpc.CallOption) (ListService_GetListClient, error) {
...
}

RPCメソッドの実行でListService_GetListClientインターフェースを返すのですが、このインターフェースはRecvメソッドを内包しており、このメソッドでサーバ側から返される複数のレスポンスを都度受け取ります。

type ListService_GetListClient interface {
    Recv() (*GetListResponse, error)
    grpc.ClientStream
}

type listServiceGetListClient struct {
    grpc.ClientStream
}

func (x *listServiceGetListClient) Recv() (*GetListResponse, error) {
    m := new(GetListResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

以上を踏まえたクライアント側の実装例としては、下記のようになります。

func getList(client pb.ListServiceClient) {
    stream, err := client.GetList(context.Background(), &pb.GetListRequest{})
    if err != nil {
        log.Fatal(err)
    }

    for {
        res, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                log.Println("stream finished.")
                return
            }
            log.Fatal(err)
        }

        log.Println("response:", res.Name)
    }
}

この例ではforで無限ループを回し、ループごとにRecvでレスポンス取得を試みます。

レスポンス取得できた場合は内容を出力し、サーバがデータを返しきった(io.EOFエラーが発生)した段階でクライアント処理も終了させています。

以上のサーバ、クライアントを実装してサーバ実行し、クライアントを実行すると以下のような出力がされます(1秒ごとに1行)。

$ go run client/main.go 
2022/11/17 02:11:01 response: This is list 0
2022/11/17 02:11:02 response: This is list 1
2022/11/17 02:11:03 response: This is list 2
2022/11/17 02:11:04 response: This is list 3
2022/11/17 02:11:05 response: This is list 4

クライアントストリーミング

Protoファイルは以下のような実装となります。

message UploadListRequest {
    string name = 1;
}
message UploadListResponse {
    string result = 1;
}

service ListService {
    rpc UploadList (stream UploadListRequest) returns (UploadListResponse);
}

今回はサーバストリーミングと異なり、サービス定義の引数の箇所にstreamを加えているのがわかります。

これにより対象のRPCメソッドがクライアントストリーミング形式となります。

Protocコマンドでgoファイルを実装すると、サーバ用の以下のようなコードが生成されます。

func (UnimplementedListServiceServer) UploadList(ListService_UploadListServer) error {
    return status.Errorf(codes.Unimplemented, "method UploadList not implemented")
}

RPCメソッドの引数にListService_UploadListServerインターフェースを指定するようになっていますが、このインターフェースはSendAndCloseメソッドとRecvを内包しています。

Recvメソッドはクライアントからの複数リクエストを都度受け取るために使用し、SendAndCloseはレスポンスを返すとともにストリームをcloseします。

type ListService_UploadListServer interface {
    SendAndClose(*UploadListResponse) error
    Recv() (*UploadListRequest, error)
    grpc.ServerStream
}
...
func (x *listServiceUploadListServer) SendAndClose(m *UploadListResponse) error {
    return x.ServerStream.SendMsg(m)
}

func (x *listServiceUploadListServer) Recv() (*UploadListRequest, error) {
    m := new(UploadListRequest)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

サーバ側での実装例としては、下記のようになります。

func (*Server) UploadList(stream pb.ListService_UploadListServer) error {
    res := ""
    for {
        req, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                stream.SendAndClose(&pb.UploadListResponse{
                    Result: res,
                })
                log.Println("request:", res)
                log.Println("stream finished.")
                return nil
            }
            return err
        }
        res += fmt.Sprintf("%s\n", req.Name)
    }
}

この例では無限forループを回し、毎ループでリクエストの取得を試みます。

リクエストを取得する度にres変数に加えていき、クライアントからのリクエスト送信が終了する(io.EOFエラーを受け取る)とレスポンスを返却してストリームをcloseし、終了します。

一方、クライアント側の生成コードは以下のようになります。

func (c *listServiceClient) UploadList(ctx context.Context, opts ...grpc.CallOption) (ListService_UploadListClient, error) {
...
}

RPCメソッドの実行でListService_UploadListClientインターフェースを返すのですが、このインターフェースはSendメソッドとCloseAndRecvメソッドを内包しています。

Sendメソッドはリクエストを送信する際に使用し、CloseAndRecvメソッドはリクエスト送信の終了を意味するio.EOFエラーを送信するとともに、サーバからレスポンスを受け取るためのものです。

type ListService_UploadListClient interface {
    Send(*UploadListRequest) error
    CloseAndRecv() (*UploadListResponse, error)
    grpc.ClientStream
}
....
func (x *listServiceUploadListClient) Send(m *UploadListRequest) error {
    return x.ClientStream.SendMsg(m)
}

func (x *listServiceUploadListClient) CloseAndRecv() (*UploadListResponse, error) {
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    m := new(UploadListResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

以上を踏まえたクライアント側の実装例としては、下記のようになります。

func uploadList(client pb.ListServiceClient) {
    stream, err := client.UploadList(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    for i := 0; i < 5; i++ {
        err := stream.Send(&pb.UploadListRequest{
            Name: fmt.Sprintf("This is list %d", i),
        })
        if err != nil {
            log.Fatal(err)
        }
        log.Println("stream sent:", i)
        time.Sleep(1 * time.Second)
    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatal(err)
    }

    log.Println("response:", res.Result)
}

この例では5回forループを回し、ループごとにSendでリクエストを送ります。

リクエストを送りきった後にCloseAndRecvでリクエスト送信完了通知&レスポンスを取得します。

以上のサーバ、クライアントを実装してサーバ実行し、クライアントを実行すると、リクエストを送信しきった後に以下のような出力がされます。

2022/11/17 02:39:33 response: This is list 0
This is list 1
This is list 2
This is list 3
This is list 4

次回

次回の記事では、双方向ストリーミングについて扱おうと思います。