06 · Concurrency
Goroutines, channels, context, sync primitives, errgroup — with real LP examples
await keyword, no Promise chain.
The mental model is closer to spawning actual worker threads — but goroutines are ~2 KB and you can have millions.
1. Goroutines
Prefix any function call with go to run it concurrently. The caller doesn't wait — it continues immediately.
// spawn a goroutine — caller continues without waiting
go func() {
fmt.Println("running concurrently")
}()
// pass args by value — capture at spawn time, not closure time
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n) // safe: n is a copy
}(i)
}
Real LP code — starting the HTTP server
In lp-reward-store-api/main.go, Fiber's Listen blocks, so it runs in a goroutine while main waits on the shutdown signal:
go func() {
log.WithFields(logger.Fields{
"address": cfg.Server.GetServerAddress(),
}).Info("Starting HTTP server")
if err := app.fiber.Listen(cfg.Server.GetServerAddress()); err != nil {
serverErrors <- fmt.Errorf("server failed to start: %w", err)
}
}()
2. Channels
Channels are typed, goroutine-safe pipes. Send with ch <- v, receive with v := <-ch.
- Unbuffered
make(chan T)— sender blocks until a receiver is ready. Synchronises two goroutines. - Buffered
make(chan T, n)— sender only blocks when the buffer is full.
// unbuffered — sender blocks until receiver reads
ch := make(chan string)
go func() { ch <- "hello" }() // blocks here until someone receives
msg := <-ch // unblocks the sender
// buffered cap=3 — sender only blocks when buffer full
jobs := make(chan int, 3)
jobs <- 1 // ok
jobs <- 2 // ok
jobs <- 3 // ok
jobs <- 4 // BLOCKS — buffer full
// close signals "no more values" — range exits cleanly
close(jobs)
for j := range jobs { fmt.Println(j) }
Real LP code — server error + shutdown channels
In lp-reward-store-api/main.go, both channels are buffered with cap 1. The OS signal subsystem must not block when it delivers a signal:
serverErrors := make(chan error, 1) // buffered: sender (goroutine) never blocks
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
go func() {
if err := app.fiber.Listen(addr); err != nil {
serverErrors <- fmt.Errorf("server failed: %w", err)
}
}()
3. select Multiplexing
select waits on multiple channel operations. The first one ready wins. If multiple are ready simultaneously, Go picks one at random.
select {
case v := <-ch1:
fmt.Println("received from ch1:", v)
case v := <-ch2:
fmt.Println("received from ch2:", v)
case <-time.After(5 * time.Second):
fmt.Println("timeout — nothing arrived")
}
// non-blocking check with default
select {
case v := <-ch:
process(v)
default:
// ch was empty, continue without waiting
}
Real LP code — graceful shutdown pattern
lp-reward-store-api/main.go uses select to handle whichever arrives first — a startup failure or an OS signal:
select {
case err := <-serverErrors:
log.WithError(err).Fatal("Server startup failed")
case sig := <-shutdown:
log.WithFields(logger.Fields{
"signal": sig.String(),
}).Info("Received shutdown signal")
if err := app.gracefulShutdown(); err != nil {
log.WithError(err).Error("Graceful shutdown failed")
os.Exit(1)
}
log.Info("Application shutdown completed")
}
4. context.Context
Context carries a deadline or cancellation signal across goroutine boundaries. Pass it as the first argument to every function that does I/O or spawns goroutines.
context.WithTimeout(parent, d)— cancels after durationdcontext.WithDeadline(parent, t)— cancels at absolute timetcontext.WithCancel(parent)— you callcancel()manually
// Rule: always defer cancel() to release resources
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// pass ctx to anything that should respect the deadline
if err := db.QueryContext(ctx, query, args...); err != nil {
return err // returns context.DeadlineExceeded if timeout hit
}
// check cancellation inside a loop
for {
select {
case <-ctx.Done():
return ctx.Err() // context.Canceled or DeadlineExceeded
default:
// do work
}
}
Real LP code — DB connection timeout
In lp-coupon-api/main.go, startup waits at most 30s for the database to become healthy:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := app.db.WaitForConnection(ctx, 30*time.Second); err != nil {
return fmt.Errorf("database connection timeout: %w", err)
}
Real LP code — Kafka consumer shutdown
In lp-coupon-consumer/main.go, a dedicated goroutine cancels the context when a shutdown signal arrives:
func (app *Application) consumeMessages(consumer *kafka.Consumer) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-app.kafkaShutdownCh // wait for shutdown signal
cancel() // cancel context → stops ReadMessage
}()
for {
msg, err := consumer.ReadMessage(ctx)
if err != nil {
if ctx.Err() != nil {
app.logger.Info("Kafka consumer stopped")
return // clean exit
}
app.logger.WithError(err).Error("Failed to read message")
continue
}
// handle msg...
}
}
5. sync.WaitGroup
WaitGroup is a counter. Add(n) before spawning, Done() inside each goroutine (use defer), Wait() blocks until the counter reaches zero.
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done() // decrements when function returns
doWork(n)
}(i)
}
wg.Wait() // blocks until all 3 goroutines call Done()
Real LP code — watching two MongoDB change streams
In consent-consumer/db/db.go, two goroutines watch separate collections; wg.Wait() keeps the process alive until both stop:
wg := new(sync.WaitGroup)
routineCtx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
wg.Add(2)
go iterateChangeStream(routineCtx, wg, consentMasterStream, updateConsentMasterToSalesforce)
go iterateChangeStream(routineCtx, wg, consentDetailStream, updateConsentDetailToSalesforce)
wg.Wait()
// each goroutine calls wg.Done() via defer:
func iterateChangeStream(ctx context.Context, wg *sync.WaitGroup, stream *mongo.ChangeStream, updateFn func(bson.M)) {
defer wg.Done()
defer stream.Close(ctx)
for stream.Next(ctx) {
var data bson.M
if err := stream.Decode(&data); err != nil { panic(err) }
updateFn(data)
}
}
wg.Add(1) in the goroutine's parent, before spawning — never inside the goroutine. If the goroutine runs too fast, Wait() could return before Add is called.
6. sync.Mutex
Mutex is a mutual-exclusion lock. Only one goroutine holds the lock at a time. Use it when multiple goroutines write to the same variable.
var mu sync.Mutex
var counter int
func increment() {
mu.Lock()
defer mu.Unlock() // always use defer to avoid forgetting unlock
counter++
}
// For read-heavy data, use RWMutex:
var rw sync.RWMutex
func read() int {
rw.RLock() // multiple goroutines can hold RLock simultaneously
defer rw.RUnlock()
return counter
}
func write(v int) {
rw.Lock() // exclusive — blocks all readers and other writers
defer rw.Unlock()
counter = v
}
Real LP code — thread-safe tracing spans
In lp-coupon-api/pkg/trace/trace.go, multiple concurrent requests append spans to the same slice:
type Tracer struct {
mu sync.Mutex
spans []Span
}
func (t *Tracer) Start(method string) func(err error) {
start := time.Now()
return func(err error) {
span := Span{Method: method, Duration: time.Since(start)}
t.mu.Lock()
t.spans = append(t.spans, span) // safe: only one goroutine at a time
t.mu.Unlock()
}
}
func (t *Tracer) Spans() []Span {
t.mu.Lock()
defer t.mu.Unlock()
return append([]Span{}, t.spans...) // return a copy, not the live slice
}
7. sync.Once
Once.Do(f) calls f exactly once, no matter how many goroutines call it concurrently. After the first call, all subsequent calls are no-ops. Canonical use: lazy singleton initialisation.
var (
instance *Client
once sync.Once
)
func GetClient() *Client {
once.Do(func() {
instance = &Client{} // runs exactly once
})
return instance // all callers get the same pointer
}
Real LP code — MongoDB singleton
In lp-campaign-api/database/mongo.go, many request handlers may call Connect() concurrently on startup:
var (
instance *MongoInstance
once sync.Once
)
func Connect() (*MongoInstance, error) {
once.Do(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongodbUri))
if err != nil {
panic(fmt.Sprintf("Failed to connect to MongoDB: %s", err))
}
instance = &MongoInstance{
Client: client,
DB: client.Database(config.Env("MONGO_DB_NAME")),
}
})
return instance, nil
}
8. Race Conditions & -race
A data race happens when two goroutines access the same variable concurrently and at least one is a write, with no synchronisation. Races are silent and non-deterministic — they corrupt data, not crash consistently.
// DATA RACE — counter is written by 100 goroutines simultaneously
var counter int
for i := 0; i < 100; i++ {
go func() {
counter++ // read + write, not atomic
}()
}
Detect races with Go's built-in detector:
go test -race ./...
go run -race main.go
-race in CI.
The race detector has ~5–10% CPU overhead — fine for tests, not for production builds.
Add it to your test pipeline: if a race is detected, the test fails with the two conflicting stack traces.
Fix options:
- Use
sync.Mutex— for structs and composite types (see §6) - Use
sync/atomic— for simple integer counters - Channel ownership — have one goroutine own the data
Real LP code — atomic counters in the ETL processor
In lp-coupon-api/scripts/etl/shared/processor.go, worker goroutines update shared counters. atomic.AddInt64 is faster than a mutex for plain integers:
var processedCount int64
var failedCount int64
// inside each worker goroutine:
if err != nil {
atomic.AddInt64(&failedCount, int64(len(job.Records)))
} else {
atomic.AddInt64(&processedCount, int64(inserted))
}
// progress monitor goroutine reads them:
processed := atomic.LoadInt64(&processedCount)
failed := atomic.LoadInt64(&failedCount)
fmt.Printf("Progress: %d processed, %d failed\n", processed, failed)
9. Worker Pool Pattern
Spawn N worker goroutines once, feed work through a channel, close the channel to signal "no more work". Workers drain the channel with range and exit naturally.
func workerPool(numWorkers int, jobs []Job) {
jobChan := make(chan Job, numWorkers*2) // small buffer reduces blocking
var wg sync.WaitGroup
// spawn workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobChan { // exits when jobChan is closed
process(job)
}
}()
}
// send work
for _, j := range jobs {
jobChan <- j
}
close(jobChan) // signal: no more jobs
wg.Wait() // wait for all workers to finish
}
Real LP code — parallel ETL batch processing
The full pattern from lp-coupon-api/scripts/etl/shared/processor.go — workers process CSV batches in parallel, atomics track progress, a monitor goroutine prints stats every 10s:
jobChan := make(chan BatchJob, cfg.NumWorkers*2)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start workers
for i := 0; i < cfg.NumWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobChan {
inserted, skipped, err := processBatch(db, job.Records, ...)
if err != nil {
atomic.AddInt64(&failedCount, int64(len(job.Records)))
} else {
atomic.AddInt64(&processedCount, int64(inserted))
atomic.AddInt64(&skippedCount, int64(skipped))
}
}
}(i)
}
// progress monitor
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Printf("Progress: %d processed, %d failed\n",
atomic.LoadInt64(&processedCount),
atomic.LoadInt64(&failedCount))
}
}
}()
// feed work from CSV
for _, batch := range batches {
jobChan <- BatchJob{Records: batch}
}
close(jobChan)
wg.Wait()
10. errgroup
golang.org/x/sync/errgroup is a WaitGroup that also collects errors. g.Wait() returns the first non-nil error from any goroutine. Paired with errgroup.WithContext, it cancels the shared context as soon as any goroutine fails.
import "golang.org/x/sync/errgroup"
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
return fetchFromDB(ctx) // if this errors, ctx is cancelled
})
g.Go(func() error {
return fetchFromCache(ctx) // this will see ctx.Done() if DB call fails
})
if err := g.Wait(); err != nil {
return err // first non-nil error
}
Comparison: WaitGroup vs errgroup for the ETL worker pool
var wg sync.WaitGroup
var firstErr error
var mu sync.Mutex
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := work(); err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
}
}()
}
wg.Wait()
return firstErr
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < N; i++ {
g.Go(func() error {
return work(ctx)
// error auto-cancels ctx
// other workers see ctx.Done()
})
}
return g.Wait()
// returns first error
errgroup:
Use it whenever goroutines can return errors and you want "fail fast" semantics — cancel everything as soon as one fails.
WaitGroup is still fine when goroutines handle their own errors internally (e.g., logging and continuing).
Key Takeaways
- Goroutines are cheap (~2 KB); spawn freely, but always give them an exit path via
ctx.Done()or channel close - Unbuffered channels synchronise two goroutines; buffered channels decouple them up to capacity
selectwaits on multiple channels — use it for graceful shutdown, timeouts, and multiplexing- Pass
context.Contextas the first argument to every function doing I/O; alwaysdefer cancel() sync.WaitGroupwaits for N goroutines;sync.Mutexprotects shared state;sync.Onceinitialises singletons- Run
go test -race ./...before every merge touching concurrent code — the race detector is your safety net - Worker pool = buffered channel + N goroutines ranging over it +
close(ch)to signal done - Prefer
errgroupover raw WaitGroup when goroutines can fail and you want fail-fast cancellation