Píšu celkem jednoduchou aplikaci pro práci nad Git repozitáři v GitHubu. Jedním z požadavků je perzistování nagrebovaných meta informací do MongoDB pro další analytické zpracování.
Úzkým hrdlem je práce se samotnou databázi, kde paralelně (ok, konkurenčně) asynchronní zpracování získaných dat pak brzí samotná sdílená konektivita. Aby se mi jednotlivé gorutiny neblokovaly na sdílené globální konektivitě, napsal jsem si jednoduchý connection pool, který mi vytváří a poskytuje nezávislé fyzické konektivity, které pak předávám gorutinám, a tak odblokovávám špunt na globální konektivitě.

Díky connection poolu pak můžu paralelně spouštět jednotlive funkce bez toho, že by si mi v rámci konektivity navzájem brzdily.
GitHub
Svoji implementaci connection poolu jsem zveřejnil pod MIT licencí na Githubu. Repo najdete zde.
main.go
package main
import (
"context"
"fmt"
"mcp-test/mcp"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func main() {
// create mongo connection pool with 20 independent connections
cp, err := mcp.Create("mongodb://localhost:27017", 20)
if err != nil {
fmt.Println(err)
}
fmt.Println("connections pool length:", cp.Length())
// get single connection from connection pool
c, i := cp.GetRandom()
fmt.Println("connection id:", i)
// get data through geted connection
getData(c, "test", "users")
// close all created connections in pool
cp.Destroy()
}
// getData is helper function for getting examples data from db
func getData(c *mongo.Client, db, coll string) {
cur, err := c.
Database(db).
Collection(coll).
Find(context.Background(), bson.D{})
if err != nil {
fmt.Println("ERR")
fmt.Println(err)
}
defer cur.Close(context.Background())
for cur.Next(context.Background()) {
var result interface{}
err := cur.Decode(&result)
if err != nil {
fmt.Println(err)
}
fmt.Println(result)
}
}
Asi nejzajímavější je funkce Create, která vytváří samotný connection pool, a pak z něj pomocí funkce GetRandom získávám jednotlovou konektivitu, která je pak už klasicky použitelná jako běžný MongoDB kient. Vedle funce GetRandom modul disponuje funkcemi GetRoundRobin, která vrací z poolu konektivity stále dokola dle pořadí, a pak funkci Get, která umí parametrem určit, ktetá konkrétní konektivita z poolu se má použít.
mcp.go
package mcp
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type mcp struct {
mutex *sync.Mutex
count uint
last uint
murl string
con []*mongo.Client
}
// createConnection internal func for raw MongoDB connection
func createConnection(murl string) (*mongo.Client, error) {
client, err := mongo.
Connect(context.Background(), options.
Client().
ApplyURI(murl))
if err != nil {
return nil, err
}
err = client.Ping(context.Background(), nil)
if err != nil {
return nil, err
}
return client, err
}
// Create MongoDB connections pool with 'n' connections
func Create(murl string, n int) (mcp, error) {
var wg sync.WaitGroup
cp := mcp{murl: murl, count: 0, last: 0}
cp.mutex = &sync.Mutex{}
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
nc, err := createConnection(cp.murl)
if err == nil {
cp.mutex.Lock()
cp.con = append(cp.con, nc)
cp.count++
cp.mutex.Unlock()
}
}()
}
wg.Wait()
return cp, nil
}
// Length return size of MongoDB connections pool
func (cp *mcp) Length() uint {
return cp.count
}
// Close single connection from pool
func (cp *mcp) Close(n uint) error {
if n >= 0 && n < cp.count {
err := cp.con[n].Ping(context.Background(), nil)
if err != nil {
return err
}
return cp.con[n].Disconnect(context.Background())
}
return errors.New("try to close not exists client")
}
// Destroy (close) all conections in pool
func (cp *mcp) Destroy() {
var i uint
for i = 0; i < cp.count; i++ {
cp.Close(i)
}
}
// Get single connection from connections pool
func (cp *mcp) Get(n uint) *mongo.Client {
if n < 0 || n > cp.count {
return nil
}
err := cp.con[n].Ping(context.Background(), nil)
if err != nil {
c, err := createConnection(cp.murl)
if err != nil {
return nil
}
cp.con[n] = c
}
return cp.con[n]
}
// GetRandom single connection from connections pool
func (cp *mcp) GetRandom() (*mongo.Client, uint32) {
rand.Seed(time.Now().UnixNano())
i := rand.Intn(int(cp.count))
return cp.con[i], uint32(i)
}
// GetRoundRobin gets single connection from connections pool by round robin
func (cp *mcp) GetRoundRobin() (*mongo.Client, uint32) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.last++
if cp.last == cp.count {
cp.last = 0
}
return cp.con[cp.last], uint32(cp.last)
}
Výsledek
Pak může vypadat takto:

PS
Dobré podotknout, že technika, vzor, connection poolu je přenosný na jakýkoliv jiný programovací jazyk. Na implementaci v Go je hezké to, že se dá práce s databází pěkně paralelizovat a tak velice jednoduše performance nafukovat.