How To Build A Concurrent Program in Go–Create a Grep clone

How To Build A Concurrent Program in Go–Create a Grep clone

Learn in simple terms how concurrent programs are built in Go by building a clone of the grep tool. In addition to learning what Golang features are used to implement concurrency, you would also learn how to make decisions about when and how to use these features together to achieve the desired result.

We would cover Goroutines, Channels, Mutex, and WaitGroups.

What We Would Build

We would be creating a grep clone that can do simple substring searching within files. It would auto-recurse into subdirectories, and we’d use goroutines to search the files for a substring match. To run our app we’d be passing in as command-line arguments the word we want to search for and the directory we want to start searching from.

Let’s begin by creating a starter program that would include two packages: one that maps out the files that would be searched through based on the directory we pass to it, and the other is a worker package that searches through each file and returns results. In the next subheadings, we’d see how to build concurrency into this program to achieve the overall goal.

Create a folder named grepclone. This would be our project directory, so have it opened in your code editor. Alternatively, you can build this project on Gitpod. To build on gitpod, create a public repository on GitHub or any other Git provider. Prefix the repository URL with gitpod.io/#. For example, if your provider is GitHub and you have named your repository grepclone then the URL you should navigate to would be: gitpod.io/#https://github.com/your-username-here/grepclone Of cause replacing your-username-here with your actual username.

Inside your project, create another folder named worklist and a file inside worklist named worklist.go. Below is all the code we’d be needing inside worklist.go.

package worklist

type Entry struct {
    Path string
}

type Worklist struct {
    jobs chan Entry
}

func (w *Worklist) Add(work Entry) {
    w.jobs <- work
}

func (w *Worklist) Next() Entry {
    j := <-w.jobs
    return j
}

func New(bufSize int) Worklist {
    return Worklist{make(chan Entry, bufSize)}
}

func NewJob(path string) Entry {
    return Entry{path}
}

// Terminate workers by passing empty path to each
func (w *Worklist) Finalize(numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        w.Add(Entry{""})
    }
}

The code above creates a structure Worklist for managing a list of the jobs/tasks that would be processed later by our worker. In programming, a Job in this kind of scenario refers to a task or an instance of a specific kind of task that is handled in the background by a function running outside of the main thread(running concurrently with the main thread), the function that handles the Job is called a worker. In our case, the jobs are file paths to the files that would be processed or searched. In our code, we’re using channels to store a list of these file paths. By:

jobs chan Entry

We’re saying the field Jobs will store a channel that takes a value of type Entry. More on channels later. After that, the next two functions are receiver functions that Add work to the worklist and return the Next on the line job from the worklist. Following those are functions to help us create new instances of Worklist and Job and a Finalize function that signals to our workers to terminate by passing to it an empty string as an entry.

Before continuing, initialize your project with the command:

go mod init grepclone

Next, create another directory on the same level as worklist, and name it worker. This will contain our worker package. Inside the directory create a file worker.go, and copy or type the code below into it:

package worker

import (
    "bufio"
    "fmt"
    "os"
    "strings"
)

type Result struct {
    Line    string
    LineNum int
    Path    string
}

type Results struct {
    Inner []Result
}

func NewResult(line string, lineNum int, path string) Result {
    return Result{line, lineNum, path}
}

func FindInFile(path string, find string) *Results {
    file, err := os.Open(path)
    if err != nil {
        fmt.Println("Error:", err)
        return nil
    }

    results := Results{make([]Result, 0)}

    scanner := bufio.NewScanner(file)
    lineNum := 1
    for scanner.Scan() {
        if strings.Contains(scanner.Text(), find) {
            r := NewResult(scanner.Text(), lineNum, path)
            results.Inner = append(results.Inner, r)
        }
        lineNum += 1
    }
    if len(results.Inner) == 0 {
        return nil
    } else {
        return &results
    }
}

The FindInFile function inside worker package takes as arguments a file path and a string that it would search for within the file and returns a pointer to results. Our result structure includes the line where a match was found, the line number of the line, and the path to the file where a match was found.

Goroutines

In the planning of our program, we decided it is important for it to do these two things:

  • Run as fast as possible

  • Print results to the terminal in real-time, i.e as the matches are found

In order to print results in real-time and also drastically reduce how much time it takes our program to complete, we’d need different parts of our program to all be running at the same time, and not wait on each other when not necessary. This is called concurrency. Concurrency is a first-class citizen of the Go language. This means that it is pretty straightforward to tell the go runtime that you want a function to run simultaneously with the main thread. Go allows us to do this using goroutines. You use goroutines to run a function concurrently by preceding the function call with the go keyword like so:

var myFunc = func() {
    // Do something
}

Go myFunc()

Our grepclone program executes three major tasks, each handled by a different function. One part gathers all files from the directory we pass to it, the second search through all the files for a match, and the third prints out the search result. In a single-threaded program, each of these major parts will execute to completion before the other can proceed, one can’t start while the other is still in progress. In our case, however, we would use goroutines to run all three parts concurrently.

Create a folder grepclone on the same level as your worklist and worker folder, then add a main.go file inside of it. Put the code below into your main.go file.

package main

import (
    "fmt"
    "grepclone/worker"
    "grepclone/worklist"
    "os"
    "path/filepath"
    "sync"
)

func getAllFiles(wl *worklist.Worklist, path string) {
    entries, err := os.ReadDir(path)
    if err != nil {
        fmt.Println("readdir error:", err)
        return
    }
    for _, entry := range entries {
        if entry.IsDir() {
            nextPath := filepath.Join(path, entry.Name())
            getAllFiles(wl, nextPath)
        } else {
            wl.Add(worklist.NewJob(filepath.Join(path, entry.Name())))
        }
    }
}

func main() {

    wl := worklist.New(100)

    numWorkers := 10

    // Get all files
    go func() {
        getAllFiles(&wl, os.Args[2])
        wl.Finalize(numWorkers)
    }()

    // Find Matches
    for i := 0; i < numWorkers; i++ {
        go func() {
            for {
                workEntry := wl.Next()
                if workEntry.Path != "" {
                    workerResult := worker.FindInFile(workEntry.Path, os.Args[1])
                } else {
                    // When the path is empty, this indicates that there are no more jobs available,
                    // so we quit.
                    return
                }
            }
        }()
    }


    go func() {
        // Print results
    }()
}

There’s a lot going on in the code above which would be explained. First of all, notice how we have three goroutines, each to carry out different logic. First, we created the function getAllFiles that takes a pointer to the worklist structure we created in our worklist package and a file path as a string, what this function does is add work(files) to our worklist. And then we spawn a goroutine inside which this function is called.

Next, notice that we created a numWorkers variable that is set to 10 and after, in a for-loop we’re spawning that number of goroutines. Inside them, we are calling our worker function worker.FindInFile and passing the next entry to it. What this all mean is that instead of having just one goroutine handle all the jobs(entries), we create ten of them and have them all be on standby to handle jobs when they arrive. Depending on our needs, we can further scale up or scale down our number of workers. More workers mean our program runs faster despite the number of files we have to process.

Lastly, after the loop, we have the goroutine where we print results. Since all three parts of our program will be running concurrently, it means our current code has no way for any of the parts to return their output for the other to work with. For example, our last goroutine for printing results will not wait for our workers to complete and return results before it starts. We need a way for these goroutines to communicate data to each other while they run.

Channels

Go allows goroutines to communicate with each other through channels. Channels are like narrow pipes through which a goroutine sends data to other goroutines while they are running. To create a channel, you would use the inbuilt make function, passing it the chan keyword, the data type you want to accept in the channel, and optionally the buffer size for a buffered channel. Here is how:

chanOne := make(chan int)

// buffered channel with buffer size of 10
chanTwo := make(chan int, 10)

A buffered channel will block and not take any other data after the channel reaches its capacity. The above channels will take data of type int.

We use the arrow <- syntax to read and write to channels. The arrow points to our channel name when we write to it:

chanOne <- 5

And points out of the channel when we read from it:

rowNum := <- chanOne

In our program, we are already using channels as a way to pass entries(file paths) from our getAllFiles function to our workers for processing. Recall how in worklist we set jobs to be a channel like so:

type Worklist struct {
    jobs chan Entry
}

And used Next to get out the next entry from the channel:

func (w *Worklist) Next() Entry {
    j := <-w.jobs
    return j
}

Now in our worker goroutine, we are going to send our results through a channel so that they can be reached from the goroutine that prints result out. We’re going to create a channel called results, and if our findInFile function returns matches we’re going to loop through the matches and send them to our channel. Make the following adjustments to your code:

package main

import (
    "fmt"
    "grepclone/worker"
    "grepclone/worklist"
    "os"
    "path/filepath"
    "sync"
)

func getAllFiles(wl *worklist.Worklist, path string) {
    entries, err := os.ReadDir(path)
    if err != nil {
        fmt.Println("readdir error:", err)
        return
    }
    for _, entry := range entries {
        if entry.IsDir() {
            nextPath := filepath.Join(path, entry.Name())
            getAllFiles(wl, nextPath)
        } else {
            wl.Add(worklist.NewJob(filepath.Join(path, entry.Name())))
        }
    }
}

func main() {

    wl := worklist.New(100)

    //results := make(chan worker.Result, 100)

    numWorkers := 10

    go func() {
        getAllFiles(&wl, os.Args[2])
        wl.Finalize(numWorkers)
    }()

    for i := 0; i < numWorkers; i++ {
        go func() {
            for {
                workEntry := wl.Next()
                if workEntry.Path != "" {
                    workerResult := worker.FindInFile(workEntry.Path, os.Args[1])
                    // if workerResult != nil {
                    //  for _, r := range workerResult.Inner {
                    //      results <- r
                    //  }
                    // }
                } else {
                    // When the path is empty, this indicates that there are no more jobs available,
                    // so we quit.
                    return
                }
            }
        }()
    }


    go func() {
        // for {
        //  select {
        //  case r := <-results:
        //      fmt.Printf("%v[%v]:%v\n", r.Path, r.LineNum, r.Line)
        //  }
        // }
    }()

}

The lines commented out represent the new additions to the code, please uncomment them on your end.

In the last part, we’re using the select keyword to select on the channel. The select keyword in Go lets you wait on multiple channel operations. It would select on whichever channel operation is ready or true. It blocks when none is true and selects what case to run first randomly when more than one is true.

WaitGroup

We need to wait for our workers and findAllFiles goroutine to finish before we can return out of our print results goroutine. WaitGroup allows us to wait for a collection of goroutines to finish before we can proceed to run a section of code that is dependent on them. They’re provided by the Go synchronization library sync. To create a WaitGroup, you simply create a variable whose type is sync.WaitGroup using the var keyword. After you’ve created this WaitGroup the variable will have access to the three functions available in wait groups:

  • .Add: To increment a WaitGroup counter usually by one to signify that a new goroutine is starting, therefore add it to the WaitGroup.

  • .Done: Signifies that a goroutine has finished, therefore the WaitGroup should decrement by one.

  • .Wait: You would call this function just before the rest of the code that needs to wait for the WaitGroup.

Let us go ahead and modify our code so that our print results goroutine would wait for our workers and findAllFiles before terminating:

package main

import (
    "fmt"
    "grepclone/worker"
    "grepclone/worklist"
    "os"
    "path/filepath"
    "sync"
)

func getAllFiles(wl *worklist.Worklist, path string) {
    entries, err := os.ReadDir(path)
    if err != nil {
        fmt.Println("readdir error:", err)
        return
    }
    for _, entry := range entries {
        if entry.IsDir() {
            nextPath := filepath.Join(path, entry.Name())
            getAllFiles(wl, nextPath)
        } else {
            wl.Add(worklist.NewJob(filepath.Join(path, entry.Name())))
        }
    }
}

func main() {

    // var workersWg sync.WaitGroup

    wl := worklist.New(100)

    results := make(chan worker.Result, 100)

    numWorkers := 10

    // workersWg.Add(1)
    go func() {
        // defer workersWg.Done()
        getAllFiles(&wl, os.Args[2])
        wl.Finalize(numWorkers)
    }()

    for i := 0; i < numWorkers; i++ {
        // workersWg.Add(1)
        go func() {
            // defer workersWg.Done()
            for {
                workEntry := wl.Next()
                if workEntry.Path != "" {
                    workerResult := worker.FindInFile(workEntry.Path, os.Args[1])
                    if workerResult != nil {
                        for _, r := range workerResult.Inner {
                            results <- r
                        }
                    }
                } else {
                    // When the path is empty, this indicates that there are no more jobs available,
                    // so we quit.
                    return
                }
            }
        }()
    }

    // blockWorkersWg := make(chan struct{})
    // go func() {
    //  workersWg.Wait()
    //  // Close channel
    //  close(blockWorkersWg)
    // }()

    // var displayWg sync.WaitGroup

    // displayWg.Add(1)

    go func() {
        for {
            select {
            case r := <-results:
                fmt.Printf("%v[%v]:%v\n", r.Path, r.LineNum, r.Line)
            // case <-blockWorkersWg:
            //  // Make sure channel is empty before aborting display goroutine
            //  if len(results) == 0 {
            //      displayWg.Done()
            //      return
            //  }
            }
        }
    }()
    // displayWg.Wait()
}

Take notice of where we have added all our .Add functions and additionally how we’ve used the defer keyword to defer the calling of our .Done functions, using defer guarantees that our WaitGroup will always decrement no matter what happens in the rest of the function.

The above code might have gotten confusing from this point:

blockWorkersWg := make(chan struct{})
    go func() {
        workersWg.Wait()
        // Close channel
        close(blockWorkersWg)
    }()

Let’s explain. We cannot simply use wg.wait() to wait for the workers before our print results goroutine can start or finish because then it would block this goroutine from doing anything at all. We need to engineer a non-blocking method to get this goroutine to wait on the other goroutines. We did this by signaling our print results goroutine through the channel blockWorkersWg. We created a different goroutine whose sole purpose is to wait on our workersWg after which closes the blockWorkersWg channel. This closing action is the only time blockWorkersWg will be selected on and its block of code run:

case <-blockWorkersWg:
                // Make sure all results has been printed before terminating goroutine

                if len(results) == 0 {
                    displayWg.Done()
                    return
                }

Inside it, we simply return after confirming that all results have indeed been printed.

At this point, you should have a complete working code. Time to test it. Run the command below and you should get results of all the file paths, line, and line numbers where the word “file” is found from the current directory:

go run grepclone/main.go file ./

Experiment with different words and different file paths.

Mutex

A mutex provides mutual exclusion, which is a way to restrict access to run a chunk of code to only one goroutine at a time. It is part of the synchronization methods in Go just like WaitGroups. A mutex can be used in place of channels when working with goroutines, however, they’re not completely interchangeable–each works better in one situation than the other. Channels are better suited for when goroutines need to communicate with each other, If instead what your program need is to make sure that no more than one goroutines are modifying the same data at the same time causing race conditions, then all you need is a mutex that locks that section of code that modifies the data to one goroutine at a time. A common mistake is to overuse channels even when goroutines are not necessarily communicating only to achieve synchronization.

In our program we used channels because we wanted to print results in real-time, and as such our workers needed to be communicating results to a different goroutine that prints them as they’re found. If we didn’t need to print results in real-time, we could simply change our results channel to just a variable that we would lock within a mutex. Let’s give it a try. Before that, make sure you can revert to the current version of your code that uses channels. Now, replace the content of your main.go file with the code:

package main

import (
    "fmt"
    "grepclone/worker"
    "grepclone/worklist"
    "os"
    "path/filepath"
    "sync"
)

func getAllFiles(wl *worklist.Worklist, path string) {
    entries, err := os.ReadDir(path)
    if err != nil {
        fmt.Println("readdir error:", err)
        return
    }
    for _, entry := range entries {
        if entry.IsDir() {
            nextPath := filepath.Join(path, entry.Name())
            getAllFiles(wl, nextPath)
        } else {
            wl.Add(worklist.NewJob(filepath.Join(path, entry.Name())))
        }
    }
}

func main() {

    var workersWg sync.WaitGroup

    // var mu sync.Mutex

    wl := worklist.New(100)

    // var results []worker.Result

    numWorkers := 10

    workersWg.Add(1)
    go func() {
        defer workersWg.Done()
        getAllFiles(&wl, os.Args[2])
        wl.Finalize(numWorkers)
    }()

    for i := 0; i < numWorkers; i++ {
        workersWg.Add(1)
        go func() {
            defer workersWg.Done()
            for {
                workEntry := wl.Next()
                if workEntry.Path != "" {
                    workerResult := worker.FindInFile(workEntry.Path, os.Args[1])
                    if workerResult != nil {
                        // mu.Lock()
                        // results = append(results, workerResult.Inner...)
                        // mu.Unlock()
                    }
                } else {
                    // When the path is empty, this indicates that there are no more jobs available,
                    // so we quit.
                    return
                }
            }
        }()
    }

    // workersWg.Wait()
    // for _, r := range results {
    //  fmt.Printf("%v[%v]:%v\n", r.Path, r.LineNum, r.Line)
    // }
}

As before I have commented out the actual new code addition. Uncomment it in your own file. You can see that we created a mutex mu, after which we used the Lock and Unlock functions to lock the section of code that modifies results, and unlock it when we’re done. This way only one of the ten workers will be running this code section at each point in time. Test and confirm that your code works as expected.

The End

In this article, we have built a truly concurrent program following best practices. To achieve this we combined the powers of goroutines, channels, and WaitGroups, in addition, we saw when it is better to use Mutex instead of channels for synchronization.