go: mongo connection pool

Píšu celkem jednoduchou aplikaci pro práci nad Git repozitáři v GitHubu. Jedním z požadavků je perzistování nagrebovaných meta informací do MongoDB pro další analytické zpracování.

Úzkým hrdlem je práce se samotnou databázi, kde paralelně (ok, konkurenčně) asynchronní zpracování získaných dat pak brzí samotná sdílená konektivita. Aby se mi jednotlivé gorutiny neblokovaly na sdílené globální konektivitě, napsal jsem si jednoduchý connection pool, který mi vytváří a poskytuje nezávislé fyzické konektivity, které pak předávám gorutinám, a tak odblokovávám špunt na globální konektivitě.

Díky connection poolu pak můžu paralelně spouštět jednotlive funkce bez toho, že by si mi v rámci konektivity navzájem brzdily.

GitHub

Svoji implementaci connection poolu jsem zveřejnil pod MIT licencí na Githubu. Repo najdete zde.

main.go

package main

import (
	"context"
	"fmt"
	"mcp-test/mcp"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
)

func main() {

	// create mongo connection pool with 20 independent connections
	cp, err := mcp.Create("mongodb://localhost:27017", 20)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("connections pool length:", cp.Length())

	// get single connection from connection pool
	c, i := cp.GetRandom()
	fmt.Println("connection id:", i)

	// get data through geted connection
	getData(c, "test", "users")

	// close all created connections in pool
	cp.Destroy()
}

// getData is helper function for getting examples data from db
func getData(c *mongo.Client, db, coll string) {
	cur, err := c.
		Database(db).
		Collection(coll).
		Find(context.Background(), bson.D{})

	if err != nil {
		fmt.Println("ERR")
		fmt.Println(err)
	}
	defer cur.Close(context.Background())

	for cur.Next(context.Background()) {
		var result interface{}
		err := cur.Decode(&result)
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println(result)
	}

}

Asi nejzajímavější je funkce Create, která vytváří samotný connection pool, a pak z něj pomocí funkce GetRandom získávám jednotlovou konektivitu, která je pak už klasicky použitelná jako běžný MongoDB kient. Vedle funce GetRandom modul disponuje funkcemi GetRoundRobin, která vrací z poolu konektivity stále dokola dle pořadí, a pak funkci Get, která umí parametrem určit, ktetá konkrétní konektivita z poolu se má použít.

mcp.go

package mcp

import (
	"context"
	"errors"
	"math/rand"
	"sync"
	"time"

	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type mcp struct {
	mutex *sync.Mutex
	count uint
	last  uint
	murl  string
	con   []*mongo.Client
}

// createConnection internal func for raw MongoDB connection
func createConnection(murl string) (*mongo.Client, error) {
	client, err := mongo.
		Connect(context.Background(), options.
			Client().
			ApplyURI(murl))
	if err != nil {
		return nil, err
	}

	err = client.Ping(context.Background(), nil)
	if err != nil {
		return nil, err
	}

	return client, err
}

// Create MongoDB connections pool with 'n' connections
func Create(murl string, n int) (mcp, error) {
	var wg sync.WaitGroup

	cp := mcp{murl: murl, count: 0, last: 0}
	cp.mutex = &sync.Mutex{}

	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			defer wg.Done()
			nc, err := createConnection(cp.murl)
			if err == nil {
				cp.mutex.Lock()
				cp.con = append(cp.con, nc)
				cp.count++
				cp.mutex.Unlock()
			}
		}()
	}
	wg.Wait()

	return cp, nil
}

// Length return size of MongoDB connections pool
func (cp *mcp) Length() uint {
	return cp.count
}

// Close single connection from pool
func (cp *mcp) Close(n uint) error {
	if n >= 0 && n < cp.count {
		err := cp.con[n].Ping(context.Background(), nil)
		if err != nil {
			return err
		}
		return cp.con[n].Disconnect(context.Background())
	}
	return errors.New("try to close not exists client")
}

// Destroy (close) all conections in pool
func (cp *mcp) Destroy() {
	var i uint
	for i = 0; i < cp.count; i++ {
		cp.Close(i)
	}
}

// Get single connection from connections pool
func (cp *mcp) Get(n uint) *mongo.Client {
	if n < 0 || n > cp.count {
		return nil
	}

	err := cp.con[n].Ping(context.Background(), nil)
	if err != nil {
		c, err := createConnection(cp.murl)
		if err != nil {
			return nil
		}
		cp.con[n] = c
	}

	return cp.con[n]
}

// GetRandom single connection from connections pool
func (cp *mcp) GetRandom() (*mongo.Client, uint32) {
	rand.Seed(time.Now().UnixNano())
	i := rand.Intn(int(cp.count))
	return cp.con[i], uint32(i)
}

// GetRoundRobin gets single connection from connections pool by round robin
func (cp *mcp) GetRoundRobin() (*mongo.Client, uint32) {
	cp.mutex.Lock()
	defer cp.mutex.Unlock()

	cp.last++
	if cp.last == cp.count {
		cp.last = 0
	}
	return cp.con[cp.last], uint32(cp.last)
}

Výsledek

Pak může vypadat takto:

PS

Dobré podotknout, že technika, vzor, connection poolu je přenosný na jakýkoliv jiný programovací jazyk. Na implementaci v Go je hezké to, že se dá práce s databází pěkně paralelizovat a tak velice jednoduše performance nafukovat.