This blog post provides insights into Golang’s concurrent programming features. It delves into the implementation of actors, independent entities communicating through messages. We do a walkthrough over a simple actor system implementation in go, which showcases actor creation, message sending, and concurrent message processing, highlighting the principles of the actor model.
Asynchronous Programming and the Actor Model in Golang
In the world of concurrent and parallel programming, asynchronous programming and the actor model stand out as powerful paradigms. Golang, with its built-in support for concurrency through goroutines
and channels, provides an excellent and intuitive environment for implementing these concepts. In this blog post, we’ll explore asynchronous programming and the actor model in Golang using a sample program.
Understanding the Actor Model
The actor model is a computer science and mathematical model for concurrent computation. It defines a set of rules for actors, which are independent entities that communicate by sending messages to each other. Each actor has its own state, and the only way to interact with an actor is by sending it a message. Actors operate independently and can create new actors or decide how to handle incoming messages.
In this post’s sample program, actors share a global state and process only one type of message, for sake of simplicity.
Let’s dive into the provided Golang program to understand how actors are implemented:
The Program
Sharing Data
Lets first demonstrate a struct
to share data across actors:
type ShareState struct {
mutex sync.Mutex
nActors int
nMsgsProcessed int
nMsgsSent int
}
Here, ShareState
is a shared data structure containing mutex-protected counters for the number of actors, messages processed, and messages sent. This ensures safe concurrent access to these variables.
ShareState
methods
These methods use mutexes to protect shared state variables (nMsgsSent
and nMsgsProcessed
) from concurrent access. By locking and unlocking the mutex during the critical sections of code where these variables are modified, the methods ensure that only one goroutine can modify the shared state at a time. This helps prevent data races and ensures the integrity of the shared state in a concurrent environment.
func (s *ShareState) incrementMsgsSent() {
s.mutex.Lock()
s.nMsgsSent += 1
s.mutex.Unlock()
}
func (s *ShareState) incrementMsgsProcessed() {
s.mutex.Lock()
s.nMsgsProcessed += 1
s.mutex.Unlock()
}
Breaking down the method’ code:
s.mutex.Lock()
: Acquires a lock on the mutex associated with the ShareState instance (s). This ensures exclusive access to shared data when incrementing the nMsgsProcessed counter.
s.nMsgsSent += 1
and s.nMsgsProcessed += 1
: Increment the nMsgsSent
and nMsgsProcessed
counter by 1. This counter represents the total number of messages sent and processed by actors respectively.
s.mutex.Unlock()
: Releases the lock on the mutex, allowing other goroutines
to access the shared data.
The Actor
struct
type Actor struct {
id string
channels *map[string]chan Message
shareData *ShareState
}
The Actor
struct represents an actor, holding its unique ID, a pointer to map of communication channels - this allows different actors to reference to the same object, the map is indexed by actor name, and each actor has its own channel -, and a reference to the shared state.
Creating actors
We now create a mechanism to create new actors:
func new(id string, channels *map[string]chan Message, channelCap int, sharedData *ShareState) (*Actor, error) {
if id == "all" {
return nil, errors.New("Actor id 'all' is reserved")
}
if _, ok := (*channels)[id]; ok {
return nil, errors.New("Actor id already exists")
}
channelsDeref := *channels
channelsDeref[id] = make(chan Message, channelCap)
fmt.Printf("Actor %s - Created\n", id)
sharedData.mutex.Lock()
sharedData.nActors += 1
sharedData.mutex.Unlock()
new_actor := Actor{id, channels, sharedData}
return &new_actor, nil
}
The new
function creates a new actor with a specified ID, communication channels, and shared data. Actors are added to the channel map, and the shared state is updated accordingly.
Sending messages
The send
method allows an actor to send a message to another actor or all actors. The messages are sent through channels, and the shared state is updated.
func (a *Actor) send(m string, to string) error {
channels := *a.channels
if len(channels) <= 1 {
return errors.New("No other actors to send message to")
}
if to == "all" {
for k, _ := range channels {
if k != a.id {
select {
case channels[k] <- Message{m, a.id}:
fmt.Printf("Actor %s - Sending message '%s' to 'all' - %s\n", a.id, m, k)
a.shareData.incrementMsgsSent()
default:
fmt.Printf("Actor %s - Channel is full\n", a.id)
return errors.New("Channel is full")
}
}
}
} else {
if _, ok := channels[to]; !ok {
errors.New("Channel not found!")
}
// prevents blocking the sender
select {
case channels[to] <- Message{m, a.id}:
fmt.Printf("Actor %s - Sending message '%s' to %s\n", a.id, m, to)
a.shareData.incrementMsgsSent()
default:
fmt.Printf("Actor %s - Channel is full\n", a.id)
return errors.New("Channel is full")
}
}
return nil
}
The send
method enables an actor to send messages to specific actors or all actors in a concurrent and non-blocking manner, using Golang’s select
statement for channel communication. It also includes error handling to account for situations like sending a message to a non-existing channel or a full channel.
Let’s break down how it works:
channels := *a.channels
: Retrieves the map of communication channels associated with the actor.
if len(channels) <= 1
: Checks if there is only one actor, meaning there’s no one else to send messages to. If true, it returns an error.
if to == "all"
: Handles the case where the message needs to be sent to all other actors.
- Iterates through each actor in the channel map using for
k, _ := range channels
.
- Checks if the actor ID (
k
) is not the sender’s ID (a.id
) to avoid sending a message to itself.
- Uses a
select
statement to attempt sending the message to the actor’s channel.
- If successful, prints a message, increments the sent message counter, and moves on to the next actor.
- If the channel is full, it prints an error message and returns an error.
- The
else
block: Handles the case where the message is sent to a specific actor (to is not “all”).
- Checks if the specified actor exists in the channel map.
- Uses a
select
statement to attempt sending the message to the specified actor’s (to
) channel.
- If successful, prints a message, increments the sent message counter.
- If the channel is full, it prints an error message and returns an error.
- Finally, returns
nil
if the message(s) are sent successfully, indicating no errors occurred.
Receiving messages
The receiveOnce
and receiveMany
methods enable an actor to process received messages. These methods update the shared state and print information about processed messages.
func (a *Actor) receiveOnce() (Message, error) {
channels := *a.channels
select {
case msg := <-channels[a.id]:
a.shareData.incrementMsgsProcessed()
fmt.Printf("Actor %s - Processed message '%s' from sender %s\n", a.id, msg.msg, msg.sender)
return msg, nil
default:
fmt.Printf("Actor %s - No messages to process.\n", a.id)
return Message{}, nil
}
}
func (a *Actor) receiveMany() ([]*Message, error) {
channels := *a.channels
var messages []*Message
// If the bellow snip was used instead, it would wait until channel was closed
// for msg := range channels[a.id]
for {
select {
case msg := <-channels[a.id]:
messages = append(messages, &msg)
a.shareData.incrementMsgsProcessed()
fmt.Printf("Actor %s - Processed message '%s' from sender %s\n", a.id, msg.msg, msg.sender)
default:
return messages, nil
}
}
}
The receiveOnce
method
The receiveOnce
method allows an actor to attempt to receive a single message from its own channel in a non-blocking manner.
If there is a message, it processes it and returns the message. Otherwise, if no message is available, it prints a message and returns an empty message.
The use of select
ensures that the actor can check for messages without getting blocked, making it suitable for scenarios where it needs to check for incoming messages periodically.
Breaking down the code:
channels := *a.channels
: dereferences the map of communication channels associated with the actor.
select
: Initiates a select
statement to handle multiple communication channels concurrently. It attempts to receive a message from the actor’s own channel (channels[a.id]
).
case msg := <-channels[a.id]:
: If a message is received from the actor’s channel, it executes the following:
- Increments the processed message counter in the shared state using
a.shareData.incrementMsgsProcessed()
.
- Prints a message indicating that the actor has processed the received message, including the message content and sender.
- Returns the received message and
nil
as the error.
default:
: If no message is received from the actor’s channel, it executes the following:
- Prints a message indicating that there are no messages to process for the actor.
- Returns an empty
Message{}
and nil
as the error.
The receiveMany
method
This receiveMany
method allows an actor to continuously receive messages from its own channel until there are no more messages to process. The loop is intentionally infinite to keep checking for messages without blocking, making it suitable for scenarios where the actor needs to process messages continuously. The use of select
ensures non-blocking behavior and allows the actor to respond to incoming messages as they arrive. The accumulated messages are then returned once there are no more messages to process.
Code Breakdown:
channels := *a.channels
: references the map of communication channels associated with the actor.
var messages []*Message
: Initializes an empty slice to store received messages.
- Infinite loop -
for {}
: Initiates an infinite loop to continuously check for messages from the actor’s channel.
select:
: Within the loop, it uses a select statement to handle multiple communication channels concurrently.
case msg := <-channels[a.id]:
: If a message is received from the actor’s channel, it executes the following:
- Appends a pointer to the received message to the
messages
slice.
- Increments the processed message counter in the shared state using
a.shareData.incrementMsgsProcessed()
.
- Prints a message indicating that the actor has processed the received message, including the message content and sender.
default:
: If no message is received from the actor’s channel, it returns the accumulated messages and nil
as the error.
Starting the actor system
func start(actors []*Actor) {
fmt.Println("Starting actors...")
for _, actor := range actors {
// synchronous call, so it waits for the actor to finish processing messages
// alternative would be to use a wait group
func(a Actor) {
if _, err := a.receiveMany(); err != nil {
fmt.Printf("Actor %s - No messages to process.\n", a.id)
}
}(*actor)
}
}
The start
function initiates the actors, making them concurrently process messages.
Running the Program
In the main
function, actors are created, messages are sent asynchronously, and the start
function is called to process the messages concurrently. Finally, the program prints the updated shared state.
func main() {
messageChannel := make(map[string]chan Message)
channelCap := 3
sharedData := ShareState{nActors: 0, nMsgsProcessed: 0, nMsgsSent: 0}
a1, _ := new("1", &messageChannel, channelCap, &sharedData)
a2, _ := new("2", &messageChannel, channelCap, &sharedData)
a3, _ := new("3", &messageChannel, channelCap, &sharedData)
fmt.Printf("N Actors: %d | Messages Sent: %d | Messages Received: %d\n", sharedData.nActors, sharedData.nMsgsSent, sharedData.nMsgsProcessed)
fmt.Println("---")
// send messages
// using sleep makes it easier to see the messages being processed
go a1.send("Viva 2", "2")
time.Sleep(1 * time.Second)
go a1.send("Bonjour 3", "3")
time.Sleep(1 * time.Second)
go a2.send("Olá 1", "1")
time.Sleep(1 * time.Second)
go a3.send("Hello World", "all")
time.Sleep(1 * time.Second)
go a3.send("Olá Mundo!", "all")
time.Sleep(1 * time.Second)
fmt.Println("---")
// await
start([]*Actor{a1, a2, a3})
fmt.Println("---")
fmt.Printf("N Actors: %d | Messages Sent: %d | Messages Received: %d\n", sharedData.nActors, sharedData.nMsgsSent, sharedData.nMsgsProcessed)
}
Sending more messages to a channel than its capacity allows can lead to the sender getting blocked until there is space in the buffer to accommodate the messages.
Given that the senders operate as goroutines
, there’s a risk of program termination before the sender completes, potentially resulting in inaccurate shared data output.
In the provided example, the channel has a maximum capacity of 3 messages, ensuring that the buffer won’t be overwhelmed. The use of select
further prevents blocking by allowing the program to execute the default
block when the buffer is full. Selecting an appropriate buffer size is crucial to guarantee the successful processing and sending of all messages.
Feel free to tweak the program and experiment with different buffer sizes to observe their impact on the output (try setting it to 1 ;).
The Whole Program
```go
package main
import (
"errors"
"fmt"
"sync"
"time"
)
type ShareState struct {
mutex sync.Mutex
nActors int
nMsgsProcessed int
nMsgsSent int
}
func (s *ShareState) incrementMsgsSent() {
s.mutex.Lock()
s.nMsgsSent += 1
s.mutex.Unlock()
}
func (s *ShareState) incrementMsgsProcessed() {
s.mutex.Lock()
s.nMsgsProcessed += 1
s.mutex.Unlock()
}
type Message struct {
msg interface{}
sender string
}
type Actor struct {
id string
channels *map[string]chan Message
shareData *ShareState
}
func new(id string, channels *map[string]chan Message, channelCap int, sharedData *ShareState) (*Actor, error) {
if id == "all" {
return nil, errors.New("Actor id 'all' is reserved")
}
if _, ok := (*channels)[id]; ok {
return nil, errors.New("Actor id already exists")
}
channelsDeref := *channels
channelsDeref[id] = make(chan Message, channelCap)
fmt.Printf("Actor %s - Created\n", id)
sharedData.mutex.Lock()
sharedData.nActors += 1
sharedData.mutex.Unlock()
new_actor := Actor{id, channels, sharedData}
return &new_actor, nil
}
func (a *Actor) send(m string, to string) error {
channels := *a.channels
if len(channels) <= 1 {
return errors.New("No other actors to send message to")
}
if to == "all" {
for k, _ := range channels {
if k != a.id {
select {
case channels[k] <- Message{m, a.id}:
fmt.Printf("Actor %s - Sending message '%s' to 'all' - %s\n", a.id, m, k)
a.shareData.incrementMsgsSent()
default:
fmt.Printf("Actor %s - Channel is full\n", a.id)
return errors.New("Channel is full")
}
}
}
} else {
if _, ok := channels[to]; !ok {
errors.New("Channel not found!")
}
// prevents blocking the sender
select {
case channels[to] <- Message{m, a.id}:
fmt.Printf("Actor %s - Sending message '%s' to %s\n", a.id, m, to)
a.shareData.incrementMsgsSent()
default:
fmt.Printf("Actor %s - Channel is full\n", a.id)
return errors.New("Channel is full")
}
}
return nil
}
func (a *Actor) receiveOnce() (Message, error) {
channels := *a.channels
select {
case msg := <-channels[a.id]:
a.shareData.incrementMsgsProcessed()
fmt.Printf("Actor %s - Processed message '%s' from sender %s\n", a.id, msg.msg, msg.sender)
return msg, nil
default:
fmt.Printf("Actor %s - No messages to process.\n", a.id)
return Message{}, nil
}
}
func (a *Actor) receiveMany() ([]*Message, error) {
channels := *a.channels
var messages []*Message
// If the bellow snip was used instead, it would wait until channel was closed
// for msg := range channels[a.id]
for {
select {
case msg := <-channels[a.id]:
messages = append(messages, &msg)
a.shareData.incrementMsgsProcessed()
fmt.Printf("Actor %s - Processed message '%s' from sender %s\n", a.id, msg.msg, msg.sender)
default:
return messages, nil
}
}
}
func start(actors []*Actor) {
fmt.Println("Starting actors...")
for _, actor := range actors {
// synchronous call, so it waits for the actor to finish processing messages
// alternative would be to use a wait group (TODO)
func(a Actor) {
if _, err := a.receiveMany(); err != nil {
fmt.Printf("Actor %s - No messages to process.\n", a.id)
}
}(*actor)
}
}
func main() {
messageChannel := make(map[string]chan Message)
channelCap := 3
sharedData := ShareState{nActors: 0, nMsgsProcessed: 0, nMsgsSent: 0}
a1, _ := new("1", &messageChannel, channelCap, &sharedData)
a2, _ := new("2", &messageChannel, channelCap, &sharedData)
a3, _ := new("3", &messageChannel, channelCap, &sharedData)
fmt.Printf("N Actors: %d | Messages Sent: %d | Messages Received: %d\n", sharedData.nActors, sharedData.nMsgsSent, sharedData.nMsgsProcessed)
fmt.Println("---")
// send messages
// using sleep makes it easier to see the messages being processed
go a1.send("Viva 2", "2")
time.Sleep(1 * time.Second)
go a1.send("Bonjour 3", "3")
time.Sleep(1 * time.Second)
go a2.send("Olá 1", "1")
time.Sleep(1 * time.Second)
go a3.send("Hello World", "all")
time.Sleep(1 * time.Second)
go a3.send("Olá Mundo!", "all")
time.Sleep(1 * time.Second)
fmt.Println("---")
// await
start([]*Actor{a1, a2, a3})
fmt.Println("---")
fmt.Printf("N Actors: %d | Messages Sent: %d | Messages Received: %d\n", sharedData.nActors, sharedData.nMsgsSent, sharedData.nMsgsProcessed)
}
```
Conclusion
In this blog post we explore asynchronous programming and the actor model in Golang, revealing the language’s robust support for concurrent systems. The provided program demonstrates the creation and communication of actors through channels, shared state management and mutex synchronization for safe concurrency.
Golang’s goroutines and channels facilitate a clean and expressive implementation of the actor model, making a great for building scalable and efficient concurrent systems.
Golang’s features, offer a powerful paradigm for handling concurrent tasks and designing responsive, resilient and event-driven applications.
References