ふわっとテック日記

テック系のことをメインに書いていきます。go言語/typescript/javascriptが好きです。たまに趣味(筋トレ)の話や日常系の記事も書きたいな〜と思っています。

Go1.21でのコンパイル最適化

Go言語では、コンパイルをより最適化するための仕組みでPGO(Profile-Guided Optimization)というものがあります。

Go1.20ではユーザにテストしてもらうためのプレビュー機能としてリリースされましたが、Go1.21では一般利用が可能となっています。


詳しいPGOの使用解説はこちら

go.dev


概要

profileとは、Goコードを実際に実行した際のCPU/メモリなどのリソースの使用状況など、プログラムの実行の様子を情報として持っているファイルのことです。

本来コンパイラソースコードを元にコンパイルを行うため、実行環境でコードがどのように実行されるかは知り得ません。

しかしGoではプロファイリングを行うことで、実行環境でのコードの振る舞いをprofileとして記録し、それをコンパイラに引き渡すことができます。

PGOではコンパイル時にこのprofileを参照することにより、コンパイルを最適化します。


この記事では以下の記事に沿ってPGOのデモを行い、使い方や効果のほどを確かめてみます。

go.dev

デモ

プロファイリング対象のサーバ用意

まずプロファイリング対象のサーバがなければいけません。

以下のような、マークダウン形式のファイルをHTML形式に変換してレスポンスするサーバを起動するコードを利用します。

package main

import (
    "bytes"
    "io"
    "log"
    "net/http"
    _ "net/http/pprof"

    "gitlab.com/golang-commonmark/markdown"
)

func render(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
        http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed)
        return
    }

    src, err := io.ReadAll(r.Body)
    if err != nil {
        log.Printf("error reading body: %v", err)
        http.Error(w, "Internal Server Error", http.StatusInternalServerError)
        return
    }

    md := markdown.New(
        markdown.XHTMLOutput(true),
        markdown.Typographer(true),
        markdown.Linkify(true),
        markdown.Tables(true),
    )

    var buf bytes.Buffer
    if err := md.Render(&buf, src); err != nil {
        log.Printf("error converting markdown: %v", err)
        http.Error(w, "Malformed markdown", http.StatusBadRequest)
        return
    }

    if _, err := io.Copy(w, &buf); err != nil {
        log.Printf("error writing response: %v", err)
        http.Error(w, "Internal Server Error", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/render", render)
    log.Printf("Serving on port 8080...")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

機能としてはシンプルなサーバですが、net/http/pprofがimportされています。

これは、後にプロファイリングを行う際のためのエンドポイントをサーバに追加するためです。

/debug/pprofで始まる複数のエンドポイントがこのパッケージによりサーバに追加されます。今回は/debug/pprof/profileのみの利用となります。)


負荷生成プログラム

プロダクション環境であればサーバに定常的にリクエストが来るため、プロファイリングも簡単にできますが、開発環境でのサーバだとリクエストが来ないためプロファイリングもできません。

そのため、サーバに一定の負荷を与え続けるための負荷生成プログラムを作成します。


記事ではサーバを起動した後、以下コマンドでサーバに対し負荷を送るようにしています。

go run github.com/prattmic/markdown-pgo/load@latest


プロファイリング

負荷を送っている状態で、net/http/pprofパッケージで追加されたエンドポイントにアクセスしてプロファイリングを行います。

curl -o cpu.pprof "http://localhost:8080/debug/pprof/profile?seconds=30"

以下のようなコマンドでもプロファイリングできます。

go tool pprof http://localhost:8080/debug/pprof/profile?seconds=30


エンドポイントに付いているsecondsというパラメータは、プロファイリング時間を指定しています。

上記コマンドで生成されたファイルがprofileです。


ビルド

Goツールチェーンは、mainパッケージのディレクトリにdefault.pgoという名前のprofileファイルを見つけた場合、ビルド時に自動的にPGOを有効にします。

Go公式でも、生成したprofileはdefault.pgoに名前を変えることを推奨しています。

(違う名前が付いていても、ビルド時に-pgoオプションでprofileパスを指定することでPGO有効にすることは可能です)


default.pgoを置いてビルドすると、普通のビルドより若干時間がかかりました。


go version -m {ビルド済みバイナリ}で、該当バイナリがPGOでビルドされたことがわかります。

$ go version -m markdown.withpgo.exe markdown.withpgo.exe: go1.21.0
        path    example.com/markdown
        ........
        ........
        build   -compiler=gc
        build   CGO_ENABLED=1
        build   CGO_CFLAGS=
        build   CGO_CPPFLAGS=
        build   CGO_CXXFLAGS=
        build   CGO_LDFLAGS=
        build   GOARCH=arm64
        build   GOOS=darwin
        build   -pgo=/Users/test-user/go/src/example.com/markdown/default.pgo


ベンチマーク

PGO無し/有りのビルド済みバイナリのベンチマークをそれぞれ取ります。

go test github.com/prattmic/markdown-pgo/load -bench=. -count=40 -source $(pwd)/README.md > nopgo.txt
go test github.com/prattmic/markdown-pgo/load -bench=. -count=40 -source $(pwd)/README.md > withpgo.txt


golang.org/x/perf/cmd/benchstatパッケージを利用し、ベンチマークの比較を行います。

$ go install golang.org/x/perf/cmd/benchstat@latest
$ benchstat nopgo.txt withpgo.txt
goos: darwin
goarch: arm64
pkg: github.com/prattmic/markdown-pgo/load
       │  nopgo.txt  │            withpgo.txt             │
       │   sec/op    │   sec/op     vs base               │
Load-8   80.98µ ± 0%   79.34µ ± 1%  -2.03% (p=0.000 n=40)

PGO有りの方が、実行が2%強速くなっているようです。


Go1.21では、PGOによりCPU使用率が2~7%ほど改善するようです。

今後のリリースでもパフォーマンスの向上を図っていくとのことなので、注目してみたいと思います。

【Go1.21】ループ処理の新機能

8月にリリースされたGo1.21のループ処理の新仕様についての記事です。

Go1.21のリリースノートはこちら

tip.golang.org


Go言語のループでは

for i := range ~~~for i := 0; i < MAX; i++ { といったような形をよく使うと思います。

ここで使ってる変数iはループ変数と呼ばれるものですが、

go1.21からこのループ変数に新しい仕様が実装されました。


今までループ変数は1回初期化されるとループ処理全体で使いまわされるという仕様でした。

つまりループ処理での1回目のイテレーションから最後のイテレーションまで、

変数は同じアドレスのものがずっと使用されるということですね。

こういった仕様によりバグが生じることはそれほど頻繁にはないのですが、

それでもコードを書いた人の想定通りに動かないということが稀に起こっています。

それが、下記のようなケースです。

  • コードサンプル1
func main() {
    for i := 0; i < 5; i++ {
        f := func() { fmt.Println("This is number", i) }
        go loopFunc(f)
    }
}

func loopFunc(f func()) {
    f()
}

(ゴルーチンの排他制御コードは省略)


このコードを書いた人はおそらく、1~5の番号の出力がそれぞれ1度ずつ出力されると想定していることでしょう。↓↓

This is output 1
This is output 2
This is output 3
This is output 4
This is output 5
(順不同)

しかし実際の出力は次のようになります。↓↓

This is output 5
This is output 5
This is output 5
This is output 5
This is output 5

なぜでしょう?

冒頭で触れた通り、ループ変数はループ処理を通じて使いまわされます

イテレーションでゴルーチンとしてloopFunc関数を呼び出していますが、これらのゴルーチンが実行される頃にはループ処理は終わっています。

つまりループ変数iには、最後のイテレーション時の値である5が入っているのです。

こうして、出力時の数値は全て5になってしまうというわけです。


これを回避するための策として今までよく用いられてきたのが、イテレーション内でループ変数の値を別の変数にコピーして使う方法です。

↓こんな感じです。

  • コードサンプル2
func main() {
    for i := 0; i < 5; i++ {
        copied := i
        f := func() { fmt.Println("This is output", copied) }
        go loopFunc(f)
    }
}

func loopFunc(f func()) {
    f()
}
(ゴルーチンの排他制御コードは省略)

こうすることでループ変数の参照渡しを防げるのでシステムバグの防止になります。

ただこの値コピーをいちいち実装するのは非常に煩雑です。


そこでGo1.21において、

ループ変数をループ処理単位ではなく、イテレーションごとに作成することを選択できるようになりました


それがこちらのLoopvarExperimentという仕組みです。

github.com


仕組みは非常に単純で、

  • ビルドコマンドや go run コマンドに接頭辞 GOEXPERIMENT=loopvar をつけてコードを実行する

だけで、ループ変数がイテレーションごとに作成されるようになり、

コードサンプル1のようなコードが想定通りに動くようになります。

GOEXPERIMENT=loopvar go install mypackage
GOEXPERIMENT=loopvar go build mypackage
GOEXPERIMENT=loopvar go test mypackage
GOEXPERIMENT=loopvar go run mycode.go

↓GOEXPERIMENT=loopvarを使ったコードサンプル1の実行結果↓

This is output 0
This is output 3
This is output 4
This is output 2
This is output 1


この仕組みは現段階では暫定的な導入であるため、GOEXPERIMENTの指定がなければ今まで通りの挙動となります。


Googleによると、この方法でのコード実行によって、既存のシステムがエラーになるということはほぼほぼあり得ないようです。

Google曰く、「2023の5月初頭からこの新しいループをプロダクションツールチェーンに適用しているが、1つの問題も報告されていない」そうです。

forループのクロージャが引き起こすバグは、複雑なシステムを運用しているとデバッグも難しく、非常に厄介なものとなります。

ぜひこのLoopvarExperimentの機能を使用してみることをお勧めします。

User-Agent Client Hints - ユーザーエージェント削減の代替策

本記事は、こちらのドキュメントを一部翻訳して執筆しています。

wicg.github.io



従来のユーザーエージェントはアクセス元のデバイスやOSバージョンなどの情報を持ちます。

多くのシステムではユーザーエージェント文字列を元に、特定デバイスやOSからのリクエストを制限するなどの実装が行われてきました。

しかし、近年のプライバシー問題に対する世界的な意識の高まりにしたがって、cookie制限などの例に漏れずユーザーエージェントの情報も不用意な公開を防ぐ潮流が生まれてきました。

(例えば近い将来では、2023年の2月のChrome110のリリースに伴い、Androidのユーザーエージェント文字列でバージョン名の固定、デバイス名の匿名化が適用される予定となっています。)

この、プライバシーの観点からユーザーエージェント情報の公開を制限する仕組みの1つが、User-Agent Client Hints (UA-CH) と呼ばれるものです。

これはGoogle Chromeが実装している機能であり、各種ブラウザの対応も進んでいます。

従来のユーザーエージェントが段階的に廃止される中、リクエスト元の情報を得るものとして将来的な対応が不可欠のものとなっています。

User-Agent Client Hints (UA-CH) 概要

ざっくり言うと、UA-CHは個人の特定が不可能なレベルでユーザーエージェント情報の部分的な受け渡しを行う機能です。

UA-CH対応ブラウザで 任意のURLにリクエストを送る際、例えば以下のようなヘッダがHTTPリクエストで送信されます。

Sec-CH-UA: "Examplary Browser"; v="73", ";Not?A.Brand"; v="27"
Sec-CH-UA-Mobile: ?0
Sec-CH-UA-Platform: "Windows"

これらの情報は従来のユーザーエージェント情報の一部であり、個人特定を防ぐために断片的に渡されています。

部分的に情報を渡すことでプライバシー対策になるだけでなく、通信のパフォーマンス向上効果も得られます。

上記を受け取ったサーバは、以下のように Accept-CH ヘッダを付与してレスポンスを返すことにより、より詳細なユーザ情報をブラウザに求めることができます。

Accept-CH: Sec-CH-UA-Platform-Version

Accept-CHを受け取ったクライアントは、その値を踏まえ、今度は次のようなリクエストを送ります。

Sec-CH-UA: "Examplary Browser"; v="73", ";Not?A.Brand"; v="27"
Sec-CH-UA-Mobile: ?0
Sec-CH-UA-Platform: "Windows"
Sec-CH-UA-Platform-Version: "14.0.0"

このように、CH-UAでは、初回のリクエストでユーザ情報の部分的な情報のみが渡され、より詳細な情報をサーバが求める場合はオプトイン形式でブラウザから取得するという形をとります。

サーバはブラウザが許可する場合に限り、ブラウザからこれらの情報を取得できます。

ヘッダのやりとりはHTTPS通信でなければできないようになっています。また、同一originでない場合、Accept-CHを使ったヘッダのやりとりは行うことができません。

Accept-CHの使用はRFC8942(HTTP Client Hints)で定義されています。

このドキュメントにも記載がありますが、ブラウザは場合によっては求められた情報に対し、プライバシー保護のために空文字、もしくは架空の値を返すことがあります。

ヘッダフィールド

Sec-CH-UA

ユーザーエージェントのブランドとそのバージョンをリスト形式で返します。バージョンはメジャーバージョンのみが返されます。

low entropy(Accept-CHヘッダでのやりとりなしで初回リクエストで渡せる、秘匿性の比較的低い情報)です。

Sec-CH-UA-Arch

CPUのアーキテクチャを返します。

Sec-CH-UA-Bitness

CPUアーキテクチャのビット数を返します。

Sec-CH-UA-Full-Version

ユーザーエージェントのフルバージョンを返します。

このヘッダは非推奨となっており、将来的に廃止される予定です。代わりに下記のSec-CH-UA-Full-Version-Listの利用が推奨されています。

Sec-CH-UA-Full-Version-List

ユーザーエージェントのブランドごとのフルバージョン情報を、リスト形式で返します。

Sec-CH-UA-Mobile

モバイルデバイスであるかどうかの真偽値を返します。

low entropyであり、デフォルトで渡されます。

Sec-CH-UA-Model

モバイルデバイス名を返します。

Sec-CH-UA-Platform

OS名を返します。

low entropyであり、デフォルトで渡されます。

Sec-CH-UA-Platform-Version

OSバージョンを返します。

Sec-CH-UA-WoW64

Windowsの64bitか32bitかの情報を真偽値で返します。64bitのwindowsなら真となります。

UA-CH API

JavaScriptでのUA-CH用のAPIが用意されています。

developer.mozilla.org

getHighEntropyValues(hints)メソッド

非同期的にhigh entropyなヘッダを取得するjsメソッドです。求めている情報が返されない場合はPromiseがrefectされ、NotAllowedErrorが返されます。

NavigatorUAData.getHighEntropyValues() - Web APIs | MDN

gRPC触ってみた(4)

↓前回記事たち↓

gRPC触ってみた

gRPC触ってみた(2)

gRPC触ってみた(3)

本記事では、双方向ストリーミングを扱います。

双方向ストリーミングとは、複数リクエストに対して複数レスポンスを返す通信形式のことを言います。

クライアントから複数文字列を1秒ごとに送信し、それに対する文字列のレスポンスをリクエストごとに受け取る、といった簡易的なサーバを作ってみたいと思います。

まず、protoファイルは以下のように記述します。

message UploadAndGetProgressRequest {
    string name = 1;
}
message UploadAndGetProgressResponse {
    string progress = 1;
}

// サービス定義
service ListService {
    rpc UploadAndGetProgress (stream UploadAndGetProgressRequest) returns (stream UploadAndGetProgressResponse);
}

サービス定義で引数と返り値の両方にstreamキーワードを記述することで、該当rpcメソッドが双方向ストリーミング対応になります。

いつも通りprotocコマンドでgrpcソースコードを作成します。

protoc -I. --go_out=. --go-grpc_out=. *.proto

サーバ

以下、サーバで使用するstream用のインターフェースです。

type ListService_UploadAndGetProgressServer interface {
    Send(*UploadAndGetProgressResponse) error
    Recv() (*UploadAndGetProgressRequest, error)
    grpc.ServerStream
}

SendRecvメソッドでリクエストの取得やレスポンス送信ができる他、Serverstreamインターフェースのメソッド一式も持っています。

これを利用したサーバ側の実装例が以下のようになります。

func (*Server) UploadAndGetProgress(stream pb.ListService_UploadAndGetProgressServer) error {
    for {
        req, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                log.Println("Server stream finished.")
                return nil
            }
            return err
        }

        name := req.Name
        log.Println("Server received:", name)

        err = stream.Send(&pb.UploadAndGetProgressResponse{
            Progress: fmt.Sprintf("Progressed at the server: %s", name),
        })
        if err != nil {
            return err
        }
    }
}

EOFエラーが発生するまでRecvメソッドを繰り返し実行し、リクエストの取得を試みます。

リクエストを取得するたびにSendメソッドでメッセージをクライアントに返します。

クライアント

以下、クライアントで使用するstream用のインターフェースです。

type ListService_UploadAndGetProgressClient interface {
    Send(*UploadAndGetProgressRequest) error
    Recv() (*UploadAndGetProgressResponse, error)
    grpc.ClientStream
}

サーバ用のインターフェースと同様にSendRecvメソッドでリクエストの送信やレスポンス取得ができるようになっています。

また、ClientSteamインターフェースのメソッド一式も持っています。

これを利用したクライアント側の実装例が以下のようになります。

func uploadAndGetProgress(client pb.ListServiceClient) {
    stream, err := client.UploadAndGetProgress(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    // 送信ゴルーチン
    go func() {
        for i := 0; i < 5; i++ {
            err = stream.Send(&pb.UploadAndGetProgressRequest{
                Name: fmt.Sprintf("Name%d", i),
            })
            if err != nil {
                log.Fatal(err)
            }
            time.Sleep(time.Second * 1)
        }

        err = stream.CloseSend()
        if err != nil {
            log.Fatal(err)
        }
    }()

    // 受信ゴルーチン
    ch := make(chan int)
    go func() {
        for {
            res, err := stream.Recv()
            if err != nil {
                if err == io.EOF {
                    log.Println("Client stream finished.")
                    break
                }
                log.Fatal(err)
            }

            progress := res.Progress
            log.Println("Client received:", progress)
        }
        close(ch)
    }()

    // 受信ゴルーチンが終了するまで待機
    <-ch
}

クライアントではゴルーチンを、以下の2つ実行しています。

ゴルーチン1つ目

リクエスト送信用のゴルーチンです。1秒の間隔を空けて、サーバに文字列を5回送信します。

送信が全て完了するとClientStreamインターフェースのCloseSendメソッドを実行し、クライアント側のクローズをサーバに知らせます。

ゴルーチン2つ目

レスポンス受信用のゴルーチンです。forで繰り返しレスポンスの取得を試みます。

サーバからEOFが返されると(サーバ側メソッドでreturn nilするとクライアントにEOFが渡されます)、forループを抜けてメインで初期化したチャネルをcloseします。

ゴルーチンが終了する前にメイン処理が終わってしまうのを防ぐため、メインでは<-ch部分でチャネルに値が入る、もしくはチャネルがcloseされるまでメイン処理をストップさせます。

受信ゴルーチンでチャネルがcloseされるとこの部分が実行され、メイン処理がそのまま終了するという流れになります。

gRPC触ってみた(3)

前回記事: gRPC触ってみた(2)

前前回記事: gRPC触ってみた

grpcのストリーム通信について

今回の記事では、grpcのストリーム通信を扱います。

grpcでは1リクエスト1レスポンス方式のUnary通信に加え、ストリーム通信を実装することができます。

ストリーム通信は以下の3通りあります。

サーバストリーミング

1リクエストに対して複数のレスポンスが返る通信形式です。

クライアントストリーミング

複数リクエストに対して1レスポンスが返る通信形式です。

双方向ストリーミング

複数リクエストに対して複数レスポンスが返る通信形式です。

サーバストリーミング

Protoファイルは以下のような実装となります。

message GetListRequest {
}
message GetListResponse {
    string name = 1;
}

service ListService {
    rpc GetList (GetListRequest) returns (stream GetListResponse);
}

サービス定義の返り値指定の箇所にstreamを加えているのがわかります。これにより対象のRPCメソッドがサーバストリーミング形式となります。

Protocコマンドでgoファイルを実装すると、サーバ用の以下のようなコードが生成されます。

func (UnimplementedListServiceServer) GetList(*GetListRequest, ListService_GetListServer) error {
    return status.Errorf(codes.Unimplemented, "method GetList not implemented")
}

RPCメソッドの引数にListService_GetListServerインターフェースを指定するようになっていますが、このインターフェースはSendメソッドを内包しており、このメソッドでレスポンスを返せるようになっています。

type ListService_GetListServer interface {
    Send(*GetListResponse) error
    grpc.ServerStream
}
...
func (x *listServiceGetListServer) Send(m *GetListResponse) error {
    return x.ServerStream.SendMsg(m)
}

サーバ側での実装例としては、下記のようになります。

func (*Server) GetList(req *pb.GetListRequest, stream pb.ListService_GetListServer) error {
    for i := 0; i < 5; i++ {
        err := stream.Send(&pb.GetListResponse{
            Name: fmt.Sprintf("This is list %d", i),
        })
        if err != nil {
            return err
        }
        log.Println("stream sent:", i)
        time.Sleep(1 * time.Second)
    }
    return nil
}

この例では5回forループを回し、毎ループでレスポンスを返しています。

最後にnilをreturnして通信をcloseさせています。

一方、クライアント側の生成コードは以下のようになります。

func (c *listServiceClient) GetList(ctx context.Context, in *GetListRequest, opts ...grpc.CallOption) (ListService_GetListClient, error) {
...
}

RPCメソッドの実行でListService_GetListClientインターフェースを返すのですが、このインターフェースはRecvメソッドを内包しており、このメソッドでサーバ側から返される複数のレスポンスを都度受け取ります。

type ListService_GetListClient interface {
    Recv() (*GetListResponse, error)
    grpc.ClientStream
}

type listServiceGetListClient struct {
    grpc.ClientStream
}

func (x *listServiceGetListClient) Recv() (*GetListResponse, error) {
    m := new(GetListResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

以上を踏まえたクライアント側の実装例としては、下記のようになります。

func getList(client pb.ListServiceClient) {
    stream, err := client.GetList(context.Background(), &pb.GetListRequest{})
    if err != nil {
        log.Fatal(err)
    }

    for {
        res, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                log.Println("stream finished.")
                return
            }
            log.Fatal(err)
        }

        log.Println("response:", res.Name)
    }
}

この例ではforで無限ループを回し、ループごとにRecvでレスポンス取得を試みます。

レスポンス取得できた場合は内容を出力し、サーバがデータを返しきった(io.EOFエラーが発生)した段階でクライアント処理も終了させています。

以上のサーバ、クライアントを実装してサーバ実行し、クライアントを実行すると以下のような出力がされます(1秒ごとに1行)。

$ go run client/main.go 
2022/11/17 02:11:01 response: This is list 0
2022/11/17 02:11:02 response: This is list 1
2022/11/17 02:11:03 response: This is list 2
2022/11/17 02:11:04 response: This is list 3
2022/11/17 02:11:05 response: This is list 4

クライアントストリーミング

Protoファイルは以下のような実装となります。

message UploadListRequest {
    string name = 1;
}
message UploadListResponse {
    string result = 1;
}

service ListService {
    rpc UploadList (stream UploadListRequest) returns (UploadListResponse);
}

今回はサーバストリーミングと異なり、サービス定義の引数の箇所にstreamを加えているのがわかります。

これにより対象のRPCメソッドがクライアントストリーミング形式となります。

Protocコマンドでgoファイルを実装すると、サーバ用の以下のようなコードが生成されます。

func (UnimplementedListServiceServer) UploadList(ListService_UploadListServer) error {
    return status.Errorf(codes.Unimplemented, "method UploadList not implemented")
}

RPCメソッドの引数にListService_UploadListServerインターフェースを指定するようになっていますが、このインターフェースはSendAndCloseメソッドとRecvを内包しています。

Recvメソッドはクライアントからの複数リクエストを都度受け取るために使用し、SendAndCloseはレスポンスを返すとともにストリームをcloseします。

type ListService_UploadListServer interface {
    SendAndClose(*UploadListResponse) error
    Recv() (*UploadListRequest, error)
    grpc.ServerStream
}
...
func (x *listServiceUploadListServer) SendAndClose(m *UploadListResponse) error {
    return x.ServerStream.SendMsg(m)
}

func (x *listServiceUploadListServer) Recv() (*UploadListRequest, error) {
    m := new(UploadListRequest)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

サーバ側での実装例としては、下記のようになります。

func (*Server) UploadList(stream pb.ListService_UploadListServer) error {
    res := ""
    for {
        req, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                stream.SendAndClose(&pb.UploadListResponse{
                    Result: res,
                })
                log.Println("request:", res)
                log.Println("stream finished.")
                return nil
            }
            return err
        }
        res += fmt.Sprintf("%s\n", req.Name)
    }
}

この例では無限forループを回し、毎ループでリクエストの取得を試みます。

リクエストを取得する度にres変数に加えていき、クライアントからのリクエスト送信が終了する(io.EOFエラーを受け取る)とレスポンスを返却してストリームをcloseし、終了します。

一方、クライアント側の生成コードは以下のようになります。

func (c *listServiceClient) UploadList(ctx context.Context, opts ...grpc.CallOption) (ListService_UploadListClient, error) {
...
}

RPCメソッドの実行でListService_UploadListClientインターフェースを返すのですが、このインターフェースはSendメソッドとCloseAndRecvメソッドを内包しています。

Sendメソッドはリクエストを送信する際に使用し、CloseAndRecvメソッドはリクエスト送信の終了を意味するio.EOFエラーを送信するとともに、サーバからレスポンスを受け取るためのものです。

type ListService_UploadListClient interface {
    Send(*UploadListRequest) error
    CloseAndRecv() (*UploadListResponse, error)
    grpc.ClientStream
}
....
func (x *listServiceUploadListClient) Send(m *UploadListRequest) error {
    return x.ClientStream.SendMsg(m)
}

func (x *listServiceUploadListClient) CloseAndRecv() (*UploadListResponse, error) {
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    m := new(UploadListResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

以上を踏まえたクライアント側の実装例としては、下記のようになります。

func uploadList(client pb.ListServiceClient) {
    stream, err := client.UploadList(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    for i := 0; i < 5; i++ {
        err := stream.Send(&pb.UploadListRequest{
            Name: fmt.Sprintf("This is list %d", i),
        })
        if err != nil {
            log.Fatal(err)
        }
        log.Println("stream sent:", i)
        time.Sleep(1 * time.Second)
    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatal(err)
    }

    log.Println("response:", res.Result)
}

この例では5回forループを回し、ループごとにSendでリクエストを送ります。

リクエストを送りきった後にCloseAndRecvでリクエスト送信完了通知&レスポンスを取得します。

以上のサーバ、クライアントを実装してサーバ実行し、クライアントを実行すると、リクエストを送信しきった後に以下のような出力がされます。

2022/11/17 02:39:33 response: This is list 0
This is list 1
This is list 2
This is list 3
This is list 4

次回

次回の記事では、双方向ストリーミングについて扱おうと思います。

gRPC触ってみた(2)

前回記事: gRPC触ってみた


今回は、grpcのサーバ実装で様々なオプションを付与することができるServerOptionについて触れていきます。

ServerOption

grpcのサーバ側で使用するgrpc.Server構造体は、以下の関数を利用して初期化されます。

func NewServer(opt ...ServerOption) *Server {
    ....
}

引数に任意の数のServerOptionインターフェースを満たす構造体を渡すことで、credentialsやcodec、keepaliveを始めとして、様々な挙動をサーバー処理の前後に入れ込むことができます。

UnaryInterceptor / ChainUnaryInterceptor

例えば下記のUnaryInterceptor関数ではサーバ処理にフックを付与するUnaryServerInterceptorを引数に渡すことで、ServerOptionを返します。

これをNewServerに渡すことでUnaryServerInterceptorで定義した処理をリクエスト受け取り時に実行できるようになります。

grpc package - google.golang.org/grpc - Go Packages

こちらがUnaryServerInterceptorの定義です。

引数のhandler(UnaryHandler)はサービスメソッドのラッパーであり、UnaryServerInterceptor内で必ず実行する必要があります。

grpc package - google.golang.org/grpc - Go Packages

UnaryInterceptorの実装例です。

func main() {
    ....
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(
            logging(),
        ),
    )
    ....
}

func logging() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        log.Printf("サーバ処理前のログ")

        resp, err = handler(ctx, req) // handlerの実行は必須です。ここが本来のサーバ処理を実行しています。
        if err != nil {
            return nil, err
        }

        log.Printf("サーバ処理後のログ")

        return resp, nil
    }
}

logging()関数で返しているUnaryServerInterceptor内で、本来のサーバ処理に加えてlog.Printfでログ出し処理を追加しています。

UnaryInterceptor関数ではUnaryServerInterceptorを1つしか指定できませんが、ChainUnaryInterceptorを使用すれば複数のUnaryServerInterceptorを設定できます。

grpc package - google.golang.org/grpc - Go Packages

以下、ChainUnaryInterceptorの実装例です。

func main() {
    ....
    grpcServer := grpc.NewServer(
        grpc.ChainUnaryInterceptor(
            logging1(),
            logging2(),
        ),
    )
    ....
}

func logging1() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        log.Printf("サーバ処理前のログ1")

        resp, err = handler(ctx, req)
        if err != nil {
            return nil, err
        }

        log.Printf("サーバ処理後のログ1")

        return resp, nil
    }
}

func logging2() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        log.Printf("サーバ処理前のログ2")

        resp, err = handler(ctx, req)
        if err != nil {
            return nil, err
        }

        log.Printf("サーバ処理後のログ2")

        return resp, nil
    }
}

ChainUnaryInterceptorで指定する複数のUnaryServerInterceptorは、最初に指定したものがラッパーの一番外側、最後が一番内側になります。

なので上記の例だと、処理の順番は

ログ出力「サーバ処理前のログ1」
↓
ログ出力「サーバ処理前のログ2」
↓
サーバ処理
↓
ログ出力「サーバ処理後のログ2」
↓
ログ出力「サーバ処理後のログ1」

となります。

MaxSendMsgSize

MaxSendMsgSize関数は、サーバが返すデータの最大バイト数を指定するServerOptionを返します。

grpc package - google.golang.org/grpc - Go Packages

サーバが返そうとするデータサイズが指定バイト数よりも大きい場合、以下のようなエラーレスポンスを返します。

2022/11/03 14:48:13 rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (109 vs. 1)

上記ではエラーコードがResourceExhaustedとなっていますが、grpcのステータスコードは以下のように、uint32のエイリアスであるCode型で表現されます。

codes package - google.golang.org/grpc/codes - Go Packages

サーバ側で任意のステータスコードを返したい時はこの中の定数から指定してあげれば良いでしょう。

HTTPのステータスコードとは異なるので注意してください。




今回は2例のみ挙げましたが、ServerOptionを設定できる関数は他にも色々あります。

割と柔軟な設定ができるので今後もいろいろと触っていきたいなーと思ってます。

おまけ: go-grpc-middleware

GitHub - grpc-ecosystem/go-grpc-middleware: Golang gRPC Middlewares: interceptor chaining, auth, logging, retries and more.

こちらのパッケージを使用することで各種Interceptorの実装が容易になり、お手軽なマイクロサービス構築が可能となります。

以下の例では、複数のUnaryServerInterceptorを1つにまとめることができるChainUnaryServerを実装しています。

import (
    ....
    grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
)

func main() {
    ....
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(
            grpc_middleware.ChainUnaryServer(
                logging1(),
                logging2(),
            ),
        ),
    )
    ....
}

func logging1() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        log.Printf("サーバ処理前のログ1")

        resp, err = handler(ctx, req)
        if err != nil {
            return nil, err
        }

        log.Printf("サーバ処理後のログ1")

        return resp, nil
    }
}

func logging2() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        log.Printf("サーバ処理前のログ2")

        resp, err = handler(ctx, req)
        if err != nil {
            return nil, err
        }

        log.Printf("サーバ処理後のログ2")

        return resp, nil
    }
}

gRPC触ってみた

gRPCを使用したサーバ/クライアント間の通信の構築をざっくりと書きます。

RPCとは

ネットワーク経由で、別ホスト上に存在するソフトウェアのプログラムを関数やメソッドを指定して呼び出す方法のことです。

関数やメソッド名をそのまま使って呼び出すため、可読性が高くなったり、別言語での実装が比較的容易になったりするのが特徴です。

gRPCとは

googleが開発したオープンソースのRPCシステムで、protocol buffersを使用してRPC通信を行います。

HTTP/2を使用した通信となります。

protocol buffersとは

googleが開発したIDL(インターフェース定義言語)です。

データのシリアライズフォーマットで、言語としての定義のしやすさや可読性が特徴となっています。

バイナリに変換するのでその状態での可読性はないものの、データサイズが小さく高速な通信が可能です。

環境構築

まずprotocをインストールします。データフォーマットを定義する.protoファイルから各言語のコードを生成するために必要となるコンパイラです。

brew install protobuf (macOS)

次に、以下2つをインストールします。

  1. protoc-gen-go(protocコマンドのプラグインで、protocol buffersコンパイラがgoコードを生成するためのもの。データのシリアライズ、デシリアライズなど)

  2. protoc-gen-go-grpc(.protoファイルのserviceに従って、gRPCのサーバ、クライアントを構築する)

このリンク先でインストールコマンドが記載されています。

Quick start | Go | gRPC

.protoファイルの記述

RPCでやりとりするデータの形式を定義するためのファイルです。基本的にこのファイルがAPI仕様書としての役割を果たし、サーバ、クライアントを実装する際の大元となります。

person.proto

syntax = "proto3"; // バージョン

package person; // パッケージ指定(名前空間を作成)

option go_package = "./pb"; // gRPCのコードを生成する際のディレクトリを指定。goファイル生成後、指定ディレクトリがパッケージとなる

import "other.proto"; // ほか.protoファイルのimportも可能

// message定義
message PersonRequest {
}
message PersonResponse {
    int32 age = 1;
    string name = 2;
}

// service定義
Service PersonService {
    rpc GetPerson (PersonRequest) returns (PersonResponse);
}

message

データを表す基本となる型。多くのデータを内包することができます。

フィールドの右側の番号はタグナンバーと呼ばれ、タグナンバーはprotocol buffersがフィールドを識別するための値です。message内で一意である必要があります。

タグナンバーは必ずしも連番にする必要はありません。

service

リクエスト/レスポンスで渡されるデータと共に定義されるメソッド群です。

Unary(1リクエスト、1レスポンス)通信とストリーム通信が定義可能ですが、上記ではUnary通信のメソッドを定義しています。

ストリーム通信はまた後に記事を書く予定です。

gRPCコンパイル

.protoファイルが書けたらいよいよgRPCのコンパイルを行います。.protoファイルを元に、サーバ/クライアントのソースコードを生成します。

以下のようなコマンドを実行します。

protoc -I. --go_out=. --go-grpc_out=. *.proto

-I

.protoファイルのパス

--go_out

出力先を指定

--go-grpc_out

gRPCのサーバ/クライアント用のソースコードを生成するためのオプション。生成するパスを指定


コマンド実行後、.protoファイルのoption go_package = xxxで指定したディレクトリ配下にgRPCのコードが生成されています。

xxx.goはmessageの定義や各種メソッド(フィールドの取得など)、xxx_grpc.goはサーバ/クライアントのためのコードファイルです。

サーバ実装

上記手順で生成されたperson_grpc.pb.goでは、サーバのインターフェースおよび構造体が次のように定義されています。

type PersonServiceServer interface {
    GetPerson(context.Context, *PersonRequest) (*PersonResponse, error)
    mustEmbedUnimplementedPersonServiceServer()
}

// UnimplementedPersonServiceServer must be embedded to have forward compatible implementations.
type UnimplementedPersonServiceServer struct {
}

func (UnimplementedPersonServiceServer) GetPerson(context.Context, *PersonRequest) (*PersonResponse, error) {
    return nil, status.Errorf(codes.Unimplemented, "method GetPerson not implemented")
}
func (UnimplementedPersonServiceServer) mustEmbedUnimplementedPersonServiceServer() {}

このPersonServiceServerインターフェースを実装することでサーバ機能を作っていきます。

このインターフェースを満たす構造体はすでにUnimplementedPersonServiceServerとして定義されているので、これをそのまま使います。

サーバ

package main

import (
    "context"
    "grpc_demo/pb"
    "log"
    "net"

    "grpc_demo/pb"

    "google.golang.org/grpc"
)

type Server struct {
    pb.UnimplementedPersonServiceServer
}

func main() {
    listener, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }

    grpcServer := grpc.NewServer()
    personServer := &Server{}

    pb.RegisterPersonServiceServer(grpcServer, personServer)

    if err = grpcServer.Serve(listener); err != nil {
        log.Fatal(err)
    }
}

func (*Server) GetPerson(ctx context.Context, req *pb.PersonRequest) (*pb.PersonResponse, error) {
    res := &pb.PersonResponse{
        Age:  20,
        Name: "John",
    }
    return res, nil
}

このような感じで実装します。

gRPCで定義したGetPersonを実装しています(UnimplementedPersonServiceServerをそのまま使うだけではメソッドが実装されていません)。

クライアント

下記のようにPersonServiceClientインターフェース、およびpersonServiceClient構造体がperson_grpc.pb.goに実装されているので、これを使って上記のサーバにリクエストを行います。

type PersonServiceClient interface {
    GetPerson(ctx context.Context, in *PersonRequest, opts ...grpc.CallOption) (*PersonResponse, error)
}

type personServiceClient struct {
    cc grpc.ClientConnInterface
}

func NewPersonServiceClient(cc grpc.ClientConnInterface) PersonServiceClient {
    return &personServiceClient{cc}
}

func (c *personServiceClient) GetPerson(ctx context.Context, in *PersonRequest, opts ...grpc.CallOption) (*PersonResponse, error) {
    out := new(PersonResponse)
    err := c.cc.Invoke(ctx, "/employee.PersonService/GetPerson", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

下記のような感じで実装をします。

クライアント

package main

import (
    "context"
    "fmt"
    "grpc_demo/pb"
    "log"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatal(err)
    }
    client := pb.NewPersonServiceClient(conn)

    getPerson(client)
}

func getPerson(client pb.PersonServiceClient) {
    res, err := client.GetPerson(context.Background(), &pb.PersonRequest{})
    if err != nil {
        log.Fatal(err)
    }

    age := res.Age
    name := res.Name
    fmt.Printf("response: age=%d, name=%s", age, name)
}

これで上記のサーバファイルを実行し、クライアントファイルを実行すると.protoファイルで定めたGetPersonが発火し、レスポンスが返ります。

次回の記事ではgRPCのより詳細な設定や、ストリーム通信についても書いていく予定です。