welcome at SebWalak.com

Metronome's Cacophony (5/14) - concurrency in Go (GoLang) - WaitGroup

Asynchronous approach (ctd)

WaitGroup

Most asynchronous activities need a mechanism to signal completion with the option to deliver its outcome. We do this so that we can gather results of these Goroutines or just to to proceed once their effect sets, including graceful application termination scenario.

Note: I am saying most and not all, because we also have “fire and forget” model where we are not bothered about this signal, or the outcome, at least not in an immediate time-frame (I would put UDP here as an example).

One of Go’s tools you can use to coordinate task execution is sync.WaitGroup. It is particularly good for signalling completion of one or more tasks.

I’m going to modify previous “shower” example so that instead of an arbitrary wait of 1 second it waits only as long as it needs.

package main

import (
   "log"
   "sync"
)

var groupActivities sync.WaitGroup

func do(activity string) {
   groupActivities.Add(1)
   go func() {
      log.Println(activity)
      groupActivities.Done()
   }()
}

func main() {
   do("walk into bathroom")
   do("undress")
   do("take shower")
   do("put pyjamas on")
   do("say good night to your host")

   groupActivities.Wait()

   log.Println("program ended")
}

Output:

2018/05/01 22:27:28 say good night to your host
2018/05/01 22:27:28 undress
2018/05/01 22:27:28 walk into bathroom
2018/05/01 22:27:28 put pyjamas on
2018/05/01 22:27:28 take shower
2018/05/01 22:27:28 program ended

Process finished with exit code 0

We didn’t have to guess when the tasks ended. We could take shower as long as needed, albeit in PJs.

This, quite typical scenario, creates a number of Goroutines within function do. Before each go invocation (@line 12), we increment the WaitGroup’s internal task counter by 1. At the end of Goroutine (but still within Goroutine!) we have to invoke WaitGroup.Done() to decrement this counter. @line 25 we want to know that all Goroutines completed, before we terminate the program. Single invocation to WaitGroup.Wait() will ensure that we won’t progress past this point, until all of them are finished. WaitGroup.Wait() blocks the execution until internal counter reaches 0.

Note: It is important to perform WaitGroup.Add() before we start the Goroutine! Otherwise, it may happen that the code gets to line with WaitGroup.Wait() before the incrementation happen. That means the internal counter will not represent the number of tasks and TaskGroup.Wait() won’t try to wait for some of them. It is also important to remember about the call to WaitGroup.Done() as omitting it will cause the internal counter to never reach 0, therefore call to WaitGroup.Wait() will block infinitely causing deadlock. WaitGroup.Done() has to be placed at the very end (after any cleanup) and inside the Goroutine, because that is the right moment to indicate tasks completion. Placing immediately after the Goroutine will most likely prematurely decrement the counter, which means WaitGroup.Wait() will terminate and shutdown the lingering Goroutine abruptly.

Java analogy: If you are using Java 1.5+ you maybe lucky as few APIs help you achieve similar effect. java.util.concurrent.Semaphore maintains an internal count which can be incremented (.release()), decreased (.acquire()) and can wait for all operations to complete (acquire(numberOfTasks)). It seems reversed because the Semaphore conceptually maintains a count of available permits rather than a count of busy Goroutines. java.util.concurrent.CountDownLatch is a non-resettable countdown. Both CountDownLatch nad Semaphore have to be initialised with a count up-front. Also, consider java.util.concurrent.CyclicBarrier, just keep in mind this needs to know the number of threads up-front. Starting with Java 1.7+ there also is java.util.concurrent.Phaser which is the most flexible construct. As you will see in the docs each of the above is way more complex than Go’s WaitGroup.

Naive solution that knows when the work is done

We can now try using this fresh knowledge to slightly improve our last solution. We know already that we will need to do more rework but this small iteration may help visualise the effect.

func(bpm param.Bpm, performer metronome.BeatPerformer) {

    // declare WaitGroup (zero-value equivalent struct is fine)
    taskGroup := sync.WaitGroup{}
    
    // remember that deferred call will be executed upon exiting this function
    // WaitGroup, wait for all tasks to invoke .Done()
    defer taskGroup.Wait()

    ticker := time.NewTicker(bpm.Interval())
    defer ticker.Stop()

    for beatCount := 0; beatCount < numberOfBeats; beatCount ++ {

        // WaitGroup, make a note of additional task I want to wait for
        taskGroup.Add(1)
        go func(beatCount int) {
            volume := volumeMeter()
            performer(beatCount, volume)
            
            // WaitGroup, this task is done, no need to wait for it
            taskGroup.Done()
        }(beatCount)

        <-ticker.C
    }
}

Note: the above function uses defer twice. Just to clarify that deferred calls are placed on a stack. Stacks are FILO structures. It means upon this function exit, ticker.Stop() will be called first, followed by taskGroup.Wait().

The code will produce similar trace:

Asynchronous - naive attempt which at least waits for completion

Asynchronous - naive attempt which at least waits for completion

So it is still a mess, but at least we get to capture every single bit of it thanks to WaitGroup.

Conclusion

I think it would be fair to state that this tiny, but malfunctioning and time-gobbling solution needs to end-up on in a bin.

I want to redo the whole structure of our metronome’s engine by creating a separate, single Goroutine with life-span nearly equal to that of the application. That Goroutine would be continuously measuring volume and, once latest volume is known, it will update some shared variable. That shared variable could be accessed by the main timing loop to adjust the volume for beat performance.

Thumbnail