Chris Langager

Concurrent Workers (Scatter/Gather)

Sometimes you need to make a whole bunch of API calls and maybe do something with all of the responses. In many situations, doing things sequentially, one API call after another is totally fine and probably necessary. For example, you might need a piece of data from the first API response in order to form the request for the second API call.

There are other times where this doesn't matter and you'd like to fire off a bunch of API calls all at the same time. This same technique can be applied for any kind of "work" (ie. db query, some slow library, etc...) but for this example, we'll stick with an API call. Here's how I like to do that:

package main

import "sync"

func main() {
  workerCount := 10 // controls concurrency (ex. how many API calls can be in flight at any given time)

  type WorkerInput struct {
    // whatever data you want to pass into your workers
  }
  workerInputChan := make(chan WorkerInput)

  type WorkerResult struct {
    // whatever data you want your workers to return
  }
  workerResultChan := make(chan WorkerResult)

  startWorker := func() {
    for input := range workerInputChan {
      fmt.Printf("worker is working on %+v...\n", input)
      // actually do the thing you want your workers to do (ex. make API calls and parse responses)

      workerResultChan <- WorkerResult{
        // set values after worker has completed their task
      }
    }
  }

  // start workers
  var wg sync.WaitGroup
  for range workerCount {
    wg.Add(1)
    go func() {
      defer wg.Done()
      startWorker()
    }()
  }

  workerInputs := []WorkerInput{
    // populate the input you want to pass to your workers
  }

  // enqueue jobs and then close the input queue - this will cause workers to shut down when there's no more work
  go func() {
    for _, workerInput := range workerInputs {
      workerInputChan <- workerInput
    }
    close(workerInputChan)
  }()

  // close the results queue after the workers are all done - this will cause the results aggregator loop below to exit
  go func() {
    wg.Wait()
    close(workerResultChan)
  }()

  for result := range workerResultChan {
    fmt.Printf("processing result %+v...", result)
    // do whatever you need to do with the results
  }
}
Go
Software