Table of Contents
- Introduction
- Prerequisites
- Setup
- Creating a Worker Pool
- Putting Jobs into the Pool
- Handling Results
- Monitoring and Shutdown
- Conclusion
Introduction
In this tutorial, we will learn how to implement a worker pool using goroutines in Go. A worker pool is a common concurrency pattern that allows a program to perform multiple tasks concurrently by distributing them among a fixed number of worker goroutines. By the end of this tutorial, you will be able to create and use a worker pool in your own Go programs.
Prerequisites
To follow along with this tutorial, you should have a basic understanding of the Go programming language, including goroutines and channels. It would also be helpful to have Go installed on your system.
Setup
Before we get started, let’s set up our project. Create a new directory for your Go project and navigate to it in the terminal. Initialize a Go module by running the following command:
go mod init worker-pool-tutorial
This will create a new go.mod
file that will be used to manage dependencies for our project.
Creating a Worker Pool
A worker pool consists of a fixed number of worker goroutines that wait for jobs to be assigned to them. We can start by defining a worker function that will perform the actual task. Create a new file named worker.go
and add the following code:
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second) // Simulating work
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j * 2
}
}
In the code above, the worker
function takes three parameters: id
identifies the worker, jobs
is a channel from which the worker receives jobs, and results
is a channel where the worker sends the results.
Inside the function, we use a for
loop to continuously receive jobs from the jobs
channel until it is closed. For each job, we print a message indicating that the worker has started the job, simulate some work using time.Sleep
, print a completion message, and then send the result back through the results
channel.
Putting Jobs into the Pool
Next, let’s create a function that distributes jobs to the worker pool. Create a new file named main.go
and add the following code:
package main
import "fmt"
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Create 3 worker goroutines
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// Add 5 jobs to the job channel
for i := 1; i <= 5; i++ {
jobs <- i
}
close(jobs)
// Collect and print the results
for i := 1; i <= 5; i++ {
result := <-results
fmt.Printf("Result: %d\n", result)
}
}
In the code above, we first create two channels: jobs
to send jobs to the workers and results
to receive the results from the workers. We then create three worker goroutines by calling the worker
function in separate goroutines.
Next, we add 5 jobs to the jobs
channel and close it to signal that no more jobs will be added. Finally, we use a for
loop to receive the results from the results
channel and print them.
Handling Results
To demonstrate how to handle results from the worker pool, let’s modify the previous code to store the results in a slice. Update the code in main.go
as follows:
package main
import "fmt"
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Create 3 worker goroutines
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// Add 5 jobs to the job channel
for i := 1; i <= 5; i++ {
jobs <- i
}
close(jobs)
// Collect the results
var output []int
for i := 1; i <= 5; i++ {
result := <-results
output = append(output, result)
}
// Print the results
fmt.Println("Results:", output)
}
In the updated code, we declare a slice named output
to store the results. After receiving each result from the results
channel, we append it to the output
slice. Finally, we print out the accumulated results.
Monitoring and Shutdown
To gracefully shutdown the worker pool and monitor its progress, we can use a sync.WaitGroup
and a dedicated goroutine. Update the code in main.go
as follows:
package main
import (
"fmt"
"sync"
)
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
var wg sync.WaitGroup
// Create 3 worker goroutines
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// Add 5 jobs to the job channel
for i := 1; i <= 5; i++ {
jobs <- i
}
close(jobs)
// Monitor progress and collect the results
go func() {
wg.Wait()
close(results)
}()
// Print the results
var output []int
for result := range results {
output = append(output, result)
}
fmt.Println("Results:", output)
}
In the updated code, we first import the sync
package to use the sync.WaitGroup
type.
We initialize a sync.WaitGroup
variable named wg
to keep track of the active worker goroutines. For each worker goroutine, we call wg.Add(1)
before starting it to indicate that we are waiting for one goroutine to finish.
We modify the anonymous function that starts a worker goroutine to take an additional id
parameter and mark it as done using defer wg.Done()
.
Next, we create a new goroutine that will wait for all workers to finish and then close the results
channel.
Finally, we modify the result collection loop to range over the results
channel and append the results to the output
slice.
Conclusion
In this tutorial, you have learned how to implement a worker pool using goroutines in Go. We started by creating a worker function that performs the actual task and then built a system to distribute jobs to the worker pool. We also explored how to handle results from the workers and monitor their progress.
By using a worker pool, you can take advantage of concurrency and parallelism to improve the performance of your Go programs. This pattern is especially useful when you have a large number of tasks to process and want to limit the number of goroutines running concurrently.
Experiment with different pool sizes and types of workloads to gain a better understanding of how a worker pool can be used to effectively streamline your code.
Remember, Go provides powerful concurrency primitives like goroutines and channels that make it easy to build robust and efficient concurrent programs.
Happy coding!