welcome at SebWalak.com

Metronome's Cacophony (3/14) - concurrency in Go (GoLang) - Ticker

Synchronous approach (ctd)

Ticker

Ticker is a very convenient tool used to schedule tasks to happen at regular interval. It is created with an invocation of time.NewTicker and needs a time.Duration parameter. It will create a channel, which will be receiving time.Time messages at intervals equal this duration. Ticker channel is available at ticker.C.

Simple example:

package main

import (
  "time"
  "fmt"
)

func main() {
   // make note of the time program started (so we can stop it after elapsed time
   start := time.Now()

   // create Ticker which will send messages with current time to channel Ticker.C every 111ms
   ticker := time.NewTicker(111 * time.Millisecond)

   // drain channel from messages until the channel is closed (never going to happen here!)
   for t := range ticker.C {

      // print t (of type time.Time), which is the next message received from the channel
      fmt.Println(t)

      // if the code run for over a second, break loop
      if time.Since(start) > time.Second {
         break
      }
   }

   // this will stop the Ticker from sending more messages
   ticker.Stop()
}

Output from the above example:

2018-05-01 12:55:38.265970476 +0100 BST m=+0.111373928
2018-05-01 12:55:38.37699173 +0100 BST m=+0.222395179
2018-05-01 12:55:38.488017207 +0100 BST m=+0.333420642
2018-05-01 12:55:38.599015454 +0100 BST m=+0.444418887
2018-05-01 12:55:38.709945783 +0100 BST m=+0.555349266
2018-05-01 12:55:38.821024122 +0100 BST m=+0.666427555
2018-05-01 12:55:38.93198204 +0100 BST m=+0.777385485
2018-05-01 12:55:39.043010703 +0100 BST m=+0.888414136
2018-05-01 12:55:39.153975803 +0100 BST m=+0.999379252
2018-05-01 12:55:39.26494749 +0100 BST m=+1.110350972

Process finished with exit code 0

Note: there is also time.Tick available, which is a convenience wrapper for time.NewTicker but it does not allow for Ticker shutdown and, as such, might cause memory leaks. In some cases it will be sufficient but be careful with it. To be frank, the channel Ticker.C also cannot be closed, even by invoking Ticker.Stop() (look at comment to Stop() method in the source). By using time.NewTicker & Ticker.Stop() however, you can at least stop dispatch of further messages.

Note: expect to read more about the channels towards the end of this series. In this context they are just a side-kick, while normally they are the superheroes of Go concurrency models.

One more example for running a piece of code once, after given interval:

package main

import (
   "time"
   "fmt"
)

func main() {
   // print current time so that we can see proof of how long the execution took
   fmt.Println(time.Now())

   // Ticker will send first message after a second
   ticker := time.NewTicker(time.Second)

   // this will stop a Ticker once this function ends
   // it is a Go's idiomatic approach to defer cleanup calls 
   // on resources as close to where they are created 
   defer ticker.Stop()

   // this will block further execution until message arrives
   <- ticker.C

   fmt.Println(time.Now())
}

Output:

2018-05-01 13:26:26.75039728 +0100 BST m=+0.000190487
2018-05-01 13:26:27.750577483 +0100 BST m=+1.000370695

Process finished with exit code 0

Java analogy: java.util.Timer in conjunction with java.util.TimerTask will get you similar functionality to that of a Go Ticker. You can even specify the associated thread name, run the timer’s thread as daemon or remove cancelled tasks from queue. I don’t feel like I am missing out yet.

Synchronous scenario with Ticker timing

Now that we know a bit more about Ticker, let’s see how we can employ it to provide, yet another, solution to our problem.

func(bpm param.Bpm, performer metronome.BeatPerformer) {  
  
   beatCount := 0  
  
   //create Ticker instance, initialised with desired interval  
   ticker := time.NewTicker(bpm.Interval())  
     
   //defer ensures the Ticker stops sending messages after we leave this function (sic! function, not block)
   defer ticker.Stop()  
  
   //for each Ticker's timing message  
   for range ticker.C {  
  
      volume := volumeMeter()  
  
      performer(beatCount, volume)  
  
      if beatCount++; beatCount >= numberOfBeats {  
         break  
      }  
   }  
}

Ok, that reads quite different to previous scenarios.

I have replaced the beat counting for-loop from previous solution with this approach, so I can utilise a handy construct.

The internals of Ticker sends one message per interval to its channel. We use for range channel construct to drain this channel. It means it will keep fetching messages until the channel is empty. The loop won’t break out until the channel is closed or there is an explicit instruction (such as break in this case). If there are no messages in the channel but the channel remains open, this construct will block execution. That’s why there is no explicit time.Sleep() invocation.

Let’s be optimistic and try this code with the good sampler first. Execution timeline

Synchronous execution with ticker - not firing immediately

Synchronous execution with ticker - not firing immediately

Spot on frequency! However, there is some strange gap before the first volume measurement happens. There seems to be a delay of about an interval duration where I didn’t expect it.

I will point out that the Ticker does not send its first message as soon as time.NewTicker is invoked. The first message will be sent after given interval.

Armed with the knowledge of Tickers and some basic appreciation of how channels work we can address this.

Synchronous scenario with immediately firing Ticker

func(bpm param.Bpm, performer metronome.BeatPerformer) {  
  
   ticker := time.NewTicker(bpm.Interval())  
   defer ticker.Stop()  
  
   for beatCount := 0; beatCount < numberOfBeats; beatCount ++ {  
  
      volume := volumeMeter()  

      //my preference is to wait for the right time here, with pre-fetched volume  
      // because timing feels more important in our case than freshness of volume
      <-ticker.C  
  
      performer(beatCount, volume)  
  
      //<-ticker.C //alternative position  
   }  
}

I’ve removed for range channel construct because it does not allow for any tweaking, which we need.

I’ve used a channel receive operator <- channel which you have seen in the Ticker example in previous section. In the exact form here it will block, until there is a message, or the channel is closed. Because it is an expression, it returns value(s) but we are not interested in them here (more about this later). We just want to wait until the time is just right to perform a beat.

Note: I could have decided to place <- channel in line 16, just after the performer invocation. Doing so however, will mean that the volume reading will be as fresh as possible but the beat rate will fluctuate. I feel a metronome should, first and foremost, be about frequency though.

Chart for the above example:

Synchronous execution with ticker firing immediately

Synchronous execution with ticker firing immediately

Good result. But let’s test it with worse volume sampler.

Same code, just with oscillating execution time of volume sampler:

Synchronous execution with ticker firing immediately - oscillating execution time

Synchronous execution with ticker firing immediately - oscillating execution time

Reminds me of execution #4 with adaptive delay.

Because we have the power to simulate various conditions, let’s take a look at execution chart when simulating random execution time of volume sampler:

Synchronous execution with ticker firing immediately - random execution time

Synchronous execution with ticker firing immediately - random execution time

You can see for yourself that this problem may be harder to solve than expected. We may need to do something extraordinary. Or do we?

Conclusions, so far

The substandard performance of our volume sampler is analogical to what happens in real-life when solving problems other than made-up, like this one. Most common type of lag we have to deal with on a daily basis is probably caused by narrow network bandwidth or overloaded servers (IO bottleneck, CPU load, etc.). Even locally we experience slowdowns, for example when writing to disk. The speed is usually much greater than that of network but the volume of data is also greater. We also expect disk writes to be instant but we got used to slower network. Even on lower level, our programs communicate with local hardware components where data bus may be slow and unreliable. Go can be used in all these areas. We need to learn Go’s concurrency tool-set to build software that deals properly with overrunning tasks.

Looking at all the charts so far, we can see that when the volume measurement was sluggish, it renders our product useless. And I don’t mean to say that product slightly worse, I actually mean useless. We need to limit the impact this volume measurement has on the program.

All solutions to this point were synchronous. Every beat was preceded by volume measurement (apart from the solution to deprecated set of requirements). The problem is that the volume measurement can take even 2.5 times the interval length (that’s what the simulation does). There is no magic here, we need to pull the long-running activities out, and get them to execute alongside the timing loop but without the damaging impact on other parts of the application.

Go is equipped with just the right enabler - Goroutines. Let’s learn how to use it.

Thumbnail