Píšu celkem jednoduchou aplikaci pro práci nad Git repozitáři v GitHubu. Jedním z požadavků je perzistování nagrebovaných meta informací do MongoDB pro další analytické zpracování.
Úzkým hrdlem je práce se samotnou databázi, kde paralelně (ok, konkurenčně) asynchronní zpracování získaných dat pak brzí samotná sdílená konektivita. Aby se mi jednotlivé gorutiny neblokovaly na sdílené globální konektivitě, napsal jsem si jednoduchý connection pool, který mi vytváří a poskytuje nezávislé fyzické konektivity, které pak předávám gorutinám, a tak odblokovávám špunt na globální konektivitě.
Díky connection poolu pak můžu paralelně spouštět jednotlive funkce bez toho, že by si mi v rámci konektivity navzájem brzdily.
GitHub
Svoji implementaci connection poolu jsem zveřejnil pod MIT licencí na Githubu. Repo najdete zde.
main.go
package main import ( "context" "fmt" "mcp-test/mcp" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) func main() { // create mongo connection pool with 20 independent connections cp, err := mcp.Create("mongodb://localhost:27017", 20) if err != nil { fmt.Println(err) } fmt.Println("connections pool length:", cp.Length()) // get single connection from connection pool c, i := cp.GetRandom() fmt.Println("connection id:", i) // get data through geted connection getData(c, "test", "users") // close all created connections in pool cp.Destroy() } // getData is helper function for getting examples data from db func getData(c *mongo.Client, db, coll string) { cur, err := c. Database(db). Collection(coll). Find(context.Background(), bson.D{}) if err != nil { fmt.Println("ERR") fmt.Println(err) } defer cur.Close(context.Background()) for cur.Next(context.Background()) { var result interface{} err := cur.Decode(&result) if err != nil { fmt.Println(err) } fmt.Println(result) } }
Asi nejzajímavější je funkce Create
, která vytváří samotný connection pool, a pak z něj pomocí funkce GetRandom
získávám jednotlovou konektivitu, která je pak už klasicky použitelná jako běžný MongoDB kient. Vedle funce GetRandom
modul disponuje funkcemi GetRoundRobin
, která vrací z poolu konektivity stále dokola dle pořadí, a pak funkci Get
, která umí parametrem určit, ktetá konkrétní konektivita z poolu se má použít.
mcp.go
package mcp import ( "context" "errors" "math/rand" "sync" "time" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type mcp struct { mutex *sync.Mutex count uint last uint murl string con []*mongo.Client } // createConnection internal func for raw MongoDB connection func createConnection(murl string) (*mongo.Client, error) { client, err := mongo. Connect(context.Background(), options. Client(). ApplyURI(murl)) if err != nil { return nil, err } err = client.Ping(context.Background(), nil) if err != nil { return nil, err } return client, err } // Create MongoDB connections pool with 'n' connections func Create(murl string, n int) (mcp, error) { var wg sync.WaitGroup cp := mcp{murl: murl, count: 0, last: 0} cp.mutex = &sync.Mutex{} wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() nc, err := createConnection(cp.murl) if err == nil { cp.mutex.Lock() cp.con = append(cp.con, nc) cp.count++ cp.mutex.Unlock() } }() } wg.Wait() return cp, nil } // Length return size of MongoDB connections pool func (cp *mcp) Length() uint { return cp.count } // Close single connection from pool func (cp *mcp) Close(n uint) error { if n >= 0 && n < cp.count { err := cp.con[n].Ping(context.Background(), nil) if err != nil { return err } return cp.con[n].Disconnect(context.Background()) } return errors.New("try to close not exists client") } // Destroy (close) all conections in pool func (cp *mcp) Destroy() { var i uint for i = 0; i < cp.count; i++ { cp.Close(i) } } // Get single connection from connections pool func (cp *mcp) Get(n uint) *mongo.Client { if n < 0 || n > cp.count { return nil } err := cp.con[n].Ping(context.Background(), nil) if err != nil { c, err := createConnection(cp.murl) if err != nil { return nil } cp.con[n] = c } return cp.con[n] } // GetRandom single connection from connections pool func (cp *mcp) GetRandom() (*mongo.Client, uint32) { rand.Seed(time.Now().UnixNano()) i := rand.Intn(int(cp.count)) return cp.con[i], uint32(i) } // GetRoundRobin gets single connection from connections pool by round robin func (cp *mcp) GetRoundRobin() (*mongo.Client, uint32) { cp.mutex.Lock() defer cp.mutex.Unlock() cp.last++ if cp.last == cp.count { cp.last = 0 } return cp.con[cp.last], uint32(cp.last) }
Výsledek
Pak může vypadat takto:
PS
Dobré podotknout, že technika, vzor, connection poolu je přenosný na jakýkoliv jiný programovací jazyk. Na implementaci v Go je hezké to, že se dá práce s databází pěkně paralelizovat a tak velice jednoduše performance nafukovat.