A Watch Channel In Go

8 November 2024

In this post I want to share a design pattern for a simple broadcast/watch channel in Go. I'm a relative golang newcomer and I've been learning how to do certain things that I already have a solution for in Rust—in this case tokio's watch channel, which provides updates of some value to an arbitrary number of receivers.

As we know, channels in Go do not broadcast. You can have multiple receivers, but a value that is sent will only go to one of them. It is noted in the documentation that you can use closing a channel to perform a broadcast of sorts, since unlike a regular channel update this will be delivered to all receivers. This is a useful hint that will make this easier to build.

You could build a broadcast channel more generally by using a subscribe/unsubscribe pattern where values are fanned out to each receiver explicitly. This is a bit of a hassle, though, and we can avoid that complexity by stipulating certain design criteria.

So long as we're satisfied with these rules, this is relatively straightforward to implement.
type Sender struct {
	value      string
	lock       sync.Mutex
	changeWait chan bool
}

The sender needs three things: the current value that is being subscribed to, a lock to protect reading and writing of that value, and a channel to notify of updates. The value could be anything—here I've arbitrarily chosen a string.

Here is the method that will allow somebody to subscribe to updates:

func (s *Sender) Subscribe() (string, chan bool) {
	s.lock.Lock()
	defer s.lock.Unlock()
	return s.value, s.changeWait
}

So far, so simple. It's important here that the lock protects access of both the value and the changeWait channel.

The subscriber should use it like this.

for {
	value, changed = sender.Subscribe()
	// use value here
	<-changed
}

This is how we avoid tracking long-term subscribers: there is no such thing as a long-term subscriber, only code that calls Subscribe() in a loop every time it is interested in the next value.

Next let's look at how the sender will update its value internally.

func (s *Sender) update(newValue string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.value = newValue
	close(s.changeWait)
	s.changeWait = make(chan bool)
}

We achieve the broadcast to all interested receivers by closing the channel. We create a new one and install this replacement while the mutex is still locked. This way, a receiver that calls Subscribe() always gets a consistent view; the value is always paired with the correct channel instance.

For example, this by comparison would not work:

for {
	value = sender.Value()
	// use value
	<-sender.Changed()
}

If value was updated between the calls to Value() and Changed() then the receiver would miss it.

This would work somewhat better:

for {
	changed = sender.Changed()
	value = sender.Value()
	// use value
	<-changed
}

However this could spuriously double-fire if a new value was introduced between the calls to Changed() and Value(). The subscriber has already seen the latest value, but it picked up an already-closed channel so it will loop around and get the same value again. Depending on the application this may or may not be a significant problem.

I have no doubt this pattern (or something better) has been used many times in Go software before, however I didn't stumble across anything like it when I was researching the problem. Perhaps this write-up will save somebody else some time!


Serious Computer Business Blog by Thomas Karpiniec
Posts RSS, Atom