Craft ⏱ 45 min read

06 · Concurrency

Goroutines, channels, context, sync primitives, errgroup — with real LP examples

Coming from JavaScript? Go concurrency is not async/await. JS has one thread with an event loop — "async" just schedules callbacks. Goroutines run on real OS threads, in parallel, right now. No await keyword, no Promise chain. The mental model is closer to spawning actual worker threads — but goroutines are ~2 KB and you can have millions.

1. Goroutines

Prefix any function call with go to run it concurrently. The caller doesn't wait — it continues immediately.

// spawn a goroutine — caller continues without waiting
go func() {
    fmt.Println("running concurrently")
}()

// pass args by value — capture at spawn time, not closure time
for i := 0; i < 3; i++ {
    go func(n int) {
        fmt.Println(n) // safe: n is a copy
    }(i)
}

Real LP code — starting the HTTP server

In lp-reward-store-api/main.go, Fiber's Listen blocks, so it runs in a goroutine while main waits on the shutdown signal:

go func() {
    log.WithFields(logger.Fields{
        "address": cfg.Server.GetServerAddress(),
    }).Info("Starting HTTP server")

    if err := app.fiber.Listen(cfg.Server.GetServerAddress()); err != nil {
        serverErrors <- fmt.Errorf("server failed to start: %w", err)
    }
}()
Goroutine leak — every goroutine needs an exit path. A goroutine blocked forever on a channel or sleeping in a loop leaks memory. See Lesson 05 · Anti-Patterns for the goroutine leak pattern.

2. Channels

Channels are typed, goroutine-safe pipes. Send with ch <- v, receive with v := <-ch.

 
// unbuffered — sender blocks until receiver reads
ch := make(chan string)
go func() { ch <- "hello" }()  // blocks here until someone receives
msg := <-ch                    // unblocks the sender

// buffered cap=3 — sender only blocks when buffer full
jobs := make(chan int, 3)
jobs <- 1  // ok
jobs <- 2  // ok
jobs <- 3  // ok
jobs <- 4  // BLOCKS — buffer full

// close signals "no more values" — range exits cleanly
close(jobs)
for j := range jobs { fmt.Println(j) }

Real LP code — server error + shutdown channels

In lp-reward-store-api/main.go, both channels are buffered with cap 1. The OS signal subsystem must not block when it delivers a signal:

serverErrors := make(chan error, 1)   // buffered: sender (goroutine) never blocks
shutdown     := make(chan os.Signal, 1)

signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

go func() {
    if err := app.fiber.Listen(addr); err != nil {
        serverErrors <- fmt.Errorf("server failed: %w", err)
    }
}()

3. select Multiplexing

select waits on multiple channel operations. The first one ready wins. If multiple are ready simultaneously, Go picks one at random.

Press Play — watch which channel fires each round
select {
case v := <-ch1:
    fmt.Println("received from ch1:", v)
case v := <-ch2:
    fmt.Println("received from ch2:", v)
case <-time.After(5 * time.Second):
    fmt.Println("timeout — nothing arrived")
}

// non-blocking check with default
select {
case v := <-ch:
    process(v)
default:
    // ch was empty, continue without waiting
}

Real LP code — graceful shutdown pattern

lp-reward-store-api/main.go uses select to handle whichever arrives first — a startup failure or an OS signal:

select {
case err := <-serverErrors:
    log.WithError(err).Fatal("Server startup failed")

case sig := <-shutdown:
    log.WithFields(logger.Fields{
        "signal": sig.String(),
    }).Info("Received shutdown signal")

    if err := app.gracefulShutdown(); err != nil {
        log.WithError(err).Error("Graceful shutdown failed")
        os.Exit(1)
    }
    log.Info("Application shutdown completed")
}

4. context.Context

Context carries a deadline or cancellation signal across goroutine boundaries. Pass it as the first argument to every function that does I/O or spawns goroutines.

Press "Cancel" — watch cancellation propagate down the tree
// Rule: always defer cancel() to release resources
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// pass ctx to anything that should respect the deadline
if err := db.QueryContext(ctx, query, args...); err != nil {
    return err // returns context.DeadlineExceeded if timeout hit
}

// check cancellation inside a loop
for {
    select {
    case <-ctx.Done():
        return ctx.Err() // context.Canceled or DeadlineExceeded
    default:
        // do work
    }
}

Real LP code — DB connection timeout

In lp-coupon-api/main.go, startup waits at most 30s for the database to become healthy:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := app.db.WaitForConnection(ctx, 30*time.Second); err != nil {
    return fmt.Errorf("database connection timeout: %w", err)
}

Real LP code — Kafka consumer shutdown

In lp-coupon-consumer/main.go, a dedicated goroutine cancels the context when a shutdown signal arrives:

func (app *Application) consumeMessages(consumer *kafka.Consumer) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        <-app.kafkaShutdownCh // wait for shutdown signal
        cancel()              // cancel context → stops ReadMessage
    }()

    for {
        msg, err := consumer.ReadMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                app.logger.Info("Kafka consumer stopped")
                return // clean exit
            }
            app.logger.WithError(err).Error("Failed to read message")
            continue
        }
        // handle msg...
    }
}

5. sync.WaitGroup

WaitGroup is a counter. Add(n) before spawning, Done() inside each goroutine (use defer), Wait() blocks until the counter reaches zero.

var wg sync.WaitGroup

for i := 0; i < 3; i++ {
    wg.Add(1)
    go func(n int) {
        defer wg.Done() // decrements when function returns
        doWork(n)
    }(i)
}

wg.Wait() // blocks until all 3 goroutines call Done()

Real LP code — watching two MongoDB change streams

In consent-consumer/db/db.go, two goroutines watch separate collections; wg.Wait() keeps the process alive until both stop:

wg := new(sync.WaitGroup)
routineCtx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

wg.Add(2)
go iterateChangeStream(routineCtx, wg, consentMasterStream, updateConsentMasterToSalesforce)
go iterateChangeStream(routineCtx, wg, consentDetailStream, updateConsentDetailToSalesforce)

wg.Wait()

// each goroutine calls wg.Done() via defer:
func iterateChangeStream(ctx context.Context, wg *sync.WaitGroup, stream *mongo.ChangeStream, updateFn func(bson.M)) {
    defer wg.Done()
    defer stream.Close(ctx)

    for stream.Next(ctx) {
        var data bson.M
        if err := stream.Decode(&data); err != nil { panic(err) }
        updateFn(data)
    }
}
Tip: Always call wg.Add(1) in the goroutine's parent, before spawning — never inside the goroutine. If the goroutine runs too fast, Wait() could return before Add is called.

6. sync.Mutex

Mutex is a mutual-exclusion lock. Only one goroutine holds the lock at a time. Use it when multiple goroutines write to the same variable.

var mu sync.Mutex
var counter int

func increment() {
    mu.Lock()
    defer mu.Unlock() // always use defer to avoid forgetting unlock
    counter++
}

// For read-heavy data, use RWMutex:
var rw sync.RWMutex

func read() int {
    rw.RLock()  // multiple goroutines can hold RLock simultaneously
    defer rw.RUnlock()
    return counter
}

func write(v int) {
    rw.Lock()   // exclusive — blocks all readers and other writers
    defer rw.Unlock()
    counter = v
}

Real LP code — thread-safe tracing spans

In lp-coupon-api/pkg/trace/trace.go, multiple concurrent requests append spans to the same slice:

type Tracer struct {
    mu    sync.Mutex
    spans []Span
}

func (t *Tracer) Start(method string) func(err error) {
    start := time.Now()
    return func(err error) {
        span := Span{Method: method, Duration: time.Since(start)}
        t.mu.Lock()
        t.spans = append(t.spans, span) // safe: only one goroutine at a time
        t.mu.Unlock()
    }
}

func (t *Tracer) Spans() []Span {
    t.mu.Lock()
    defer t.mu.Unlock()
    return append([]Span{}, t.spans...) // return a copy, not the live slice
}

7. sync.Once

Once.Do(f) calls f exactly once, no matter how many goroutines call it concurrently. After the first call, all subsequent calls are no-ops. Canonical use: lazy singleton initialisation.

var (
    instance *Client
    once     sync.Once
)

func GetClient() *Client {
    once.Do(func() {
        instance = &Client{} // runs exactly once
    })
    return instance // all callers get the same pointer
}

Real LP code — MongoDB singleton

In lp-campaign-api/database/mongo.go, many request handlers may call Connect() concurrently on startup:

var (
    instance *MongoInstance
    once     sync.Once
)

func Connect() (*MongoInstance, error) {
    once.Do(func() {
        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()

        client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongodbUri))
        if err != nil {
            panic(fmt.Sprintf("Failed to connect to MongoDB: %s", err))
        }
        instance = &MongoInstance{
            Client: client,
            DB:     client.Database(config.Env("MONGO_DB_NAME")),
        }
    })
    return instance, nil
}

8. Race Conditions & -race

A data race happens when two goroutines access the same variable concurrently and at least one is a write, with no synchronisation. Races are silent and non-deterministic — they corrupt data, not crash consistently.

⚠ DATA RACE detected — counter=1 but expected 2. Run: go test -race ./...
// DATA RACE — counter is written by 100 goroutines simultaneously
var counter int
for i := 0; i < 100; i++ {
    go func() {
        counter++ // read + write, not atomic
    }()
}

Detect races with Go's built-in detector:

go test -race ./...
go run -race main.go
Always run -race in CI. The race detector has ~5–10% CPU overhead — fine for tests, not for production builds. Add it to your test pipeline: if a race is detected, the test fails with the two conflicting stack traces.

Fix options:

Real LP code — atomic counters in the ETL processor

In lp-coupon-api/scripts/etl/shared/processor.go, worker goroutines update shared counters. atomic.AddInt64 is faster than a mutex for plain integers:

var processedCount int64
var failedCount    int64

// inside each worker goroutine:
if err != nil {
    atomic.AddInt64(&failedCount, int64(len(job.Records)))
} else {
    atomic.AddInt64(&processedCount, int64(inserted))
}

// progress monitor goroutine reads them:
processed := atomic.LoadInt64(&processedCount)
failed    := atomic.LoadInt64(&failedCount)
fmt.Printf("Progress: %d processed, %d failed\n", processed, failed)

9. Worker Pool Pattern

Spawn N worker goroutines once, feed work through a channel, close the channel to signal "no more work". Workers drain the channel with range and exit naturally.

func workerPool(numWorkers int, jobs []Job) {
    jobChan := make(chan Job, numWorkers*2) // small buffer reduces blocking
    var wg sync.WaitGroup

    // spawn workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobChan { // exits when jobChan is closed
                process(job)
            }
        }()
    }

    // send work
    for _, j := range jobs {
        jobChan <- j
    }
    close(jobChan) // signal: no more jobs

    wg.Wait() // wait for all workers to finish
}

Real LP code — parallel ETL batch processing

The full pattern from lp-coupon-api/scripts/etl/shared/processor.go — workers process CSV batches in parallel, atomics track progress, a monitor goroutine prints stats every 10s:

jobChan := make(chan BatchJob, cfg.NumWorkers*2)
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start workers
for i := 0; i < cfg.NumWorkers; i++ {
    wg.Add(1)
    go func(workerID int) {
        defer wg.Done()
        for job := range jobChan {
            inserted, skipped, err := processBatch(db, job.Records, ...)
            if err != nil {
                atomic.AddInt64(&failedCount, int64(len(job.Records)))
            } else {
                atomic.AddInt64(&processedCount, int64(inserted))
                atomic.AddInt64(&skippedCount, int64(skipped))
            }
        }
    }(i)
}

// progress monitor
go func() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            fmt.Printf("Progress: %d processed, %d failed\n",
                atomic.LoadInt64(&processedCount),
                atomic.LoadInt64(&failedCount))
        }
    }
}()

// feed work from CSV
for _, batch := range batches {
    jobChan <- BatchJob{Records: batch}
}
close(jobChan)
wg.Wait()

10. errgroup

golang.org/x/sync/errgroup is a WaitGroup that also collects errors. g.Wait() returns the first non-nil error from any goroutine. Paired with errgroup.WithContext, it cancels the shared context as soon as any goroutine fails.

import "golang.org/x/sync/errgroup"

g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error {
    return fetchFromDB(ctx) // if this errors, ctx is cancelled
})
g.Go(func() error {
    return fetchFromCache(ctx) // this will see ctx.Done() if DB call fails
})

if err := g.Wait(); err != nil {
    return err // first non-nil error
}

Comparison: WaitGroup vs errgroup for the ETL worker pool

WaitGroup (current LP code)
var wg sync.WaitGroup
var firstErr error
var mu sync.Mutex

for i := 0; i < N; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := work(); err != nil {
            mu.Lock()
            if firstErr == nil {
                firstErr = err
            }
            mu.Unlock()
        }
    }()
}
wg.Wait()
return firstErr
errgroup (cleaner)
g, ctx := errgroup.WithContext(ctx)

for i := 0; i < N; i++ {
    g.Go(func() error {
        return work(ctx)
        // error auto-cancels ctx
        // other workers see ctx.Done()
    })
}

return g.Wait()
// returns first error
When to prefer errgroup: Use it whenever goroutines can return errors and you want "fail fast" semantics — cancel everything as soon as one fails. WaitGroup is still fine when goroutines handle their own errors internally (e.g., logging and continuing).

Key Takeaways