Synchronization
Now that you understand applications, RPC communication, and drivers, let’s explore how to coordinate multiple threads working together. This example shows how to use CMRX’s notification system to synchronize threads safely and efficiently.
What You’ll Learn
- How to coordinate multiple threads within an application
- Using
wait_for_object()andnotify_object()for synchronization - Understanding priority-based notification ordering
- Implementing producer-consumer patterns
- Using timeouts to prevent deadlocks
- Building thread-safe data structures
Why Thread Synchronization is Needed
In embedded systems, you often have multiple threads that need to coordinate their work:
- Producer-Consumer: One thread generates data, another processes it
- Resource Sharing: Multiple threads need to access shared hardware or data
- Event Coordination: Threads need to wait for events from other threads
- State Synchronization: Threads need to coordinate state changes
Without proper synchronization, you get race conditions - unpredictable behavior when threads access shared resources simultaneously.
CMRX Notification System
CMRX provides a simple but powerful notification system based on object addresses:
Key Functions
// Wait for notification on an object
int wait_for_object(void* object, unsigned int timeout);
// Notify one waiting thread
void notify_object(void* object);
How It Works
- Object-Based: Any variable’s address can be used as a synchronization point
- Priority-Based: Highest priority waiting thread gets notified first
- Single Notification: Only one thread is woken up per
notify_object()call - Timeout Support: Prevent threads from waiting forever
Basic Synchronization Example
Let’s create a simple producer-consumer example where one thread generates data and another processes it:
#include <cmrx/application.h>
#include <cmrx/ipc/notify.h>
#include <stdio.h>
// Shared data structure
struct DataBuffer {
int data[10];
int write_index;
int read_index;
int count;
bool running;
};
// Our shared buffer instance
static struct DataBuffer buffer = {
.write_index = 0,
.read_index = 0,
.count = 0,
.running = true
};
// We'll use these addresses as notification objects
static int producer_signal = 0; // Signal when data is available
static int consumer_signal = 0; // Signal when space is available
Producer Thread
The producer generates data and notifies the consumer:
void producer_thread(void* data) {
int value = 1;
while (buffer.running) {
// Wait if buffer is full
while (buffer.count >= 10) {
printf("Producer: Buffer full, waiting for space...\n");
// Wait for consumer to make space (with 5 second timeout)
int result = wait_for_object(&consumer_signal, 5000000);
if (result != 0) {
printf("Producer: Timeout waiting for space!\n");
continue;
}
}
// Add data to buffer
buffer.data[buffer.write_index] = value;
buffer.write_index = (buffer.write_index + 1) % 10;
buffer.count++;
printf("Producer: Added %d to buffer (count: %d)\n", value, buffer.count);
value++;
// Notify consumer that data is available
notify_object(&producer_signal);
// Simulate work
for (volatile int i = 0; i < 100000; i++);
}
}
Consumer Thread
The consumer waits for data and processes it:
void consumer_thread(void* data) {
while (buffer.running) {
// Wait if buffer is empty
while (buffer.count == 0) {
printf("Consumer: Buffer empty, waiting for data...\n");
// Wait for producer to add data (with 3 second timeout)
int result = wait_for_object(&producer_signal, 3000000);
if (result != 0) {
printf("Consumer: Timeout waiting for data!\n");
continue;
}
}
// Remove data from buffer
int value = buffer.data[buffer.read_index];
buffer.read_index = (buffer.read_index + 1) % 10;
buffer.count--;
printf("Consumer: Processed %d from buffer (count: %d)\n", value, buffer.count);
// Notify producer that space is available
notify_object(&consumer_signal);
// Simulate processing work
for (volatile int i = 0; i < 200000; i++);
}
}
Control Thread
A third thread that can stop the system:
void control_thread(void* data) {
// Let the system run for a while
for (volatile int i = 0; i < 10000000; i++);
printf("Control: Shutting down system...\n");
buffer.running = false;
// Wake up any waiting threads so they can exit
notify_object(&producer_signal);
notify_object(&consumer_signal);
}
Application Setup
// Create the application with all threads
OS_APPLICATION(sync_example);
// Create threads with different priorities
OS_THREAD_CREATE_PRIORITY(sync_example, producer_thread, NULL, 128, 10);
OS_THREAD_CREATE_PRIORITY(sync_example, consumer_thread, NULL, 128, 10);
OS_THREAD_CREATE_PRIORITY(sync_example, control_thread, NULL, 64, 5);
Advanced Example: Resource Pool
Here’s a more sophisticated example that manages a pool of shared resources:
#include <cmrx/application.h>
#include <cmrx/ipc/notify.h>
#define MAX_RESOURCES 3
// Resource pool structure
struct ResourcePool {
int resources[MAX_RESOURCES];
bool available[MAX_RESOURCES];
int available_count;
};
static struct ResourcePool pool = {
.resources = {100, 200, 300}, // Resource IDs
.available = {true, true, true},
.available_count = MAX_RESOURCES
};
// Notification object for resource availability
static int resource_available = 0;
// Acquire a resource from the pool
int acquire_resource(unsigned int timeout_us) {
while (pool.available_count == 0) {
printf("Thread: No resources available, waiting...\n");
// Wait for a resource to become available
int result = wait_for_object(&resource_available, timeout_us);
if (result != 0) {
printf("Thread: Timeout acquiring resource!\n");
return -1; // Timeout
}
}
// Find and allocate first available resource
for (int i = 0; i < MAX_RESOURCES; i++) {
if (pool.available[i]) {
pool.available[i] = false;
pool.available_count--;
printf("Thread: Acquired resource %d (remaining: %d)\n",
pool.resources[i], pool.available_count);
return pool.resources[i];
}
}
return -1; // Should never reach here
}
// Release a resource back to the pool
void release_resource(int resource_id) {
for (int i = 0; i < MAX_RESOURCES; i++) {
if (pool.resources[i] == resource_id && !pool.available[i]) {
pool.available[i] = true;
pool.available_count++;
printf("Thread: Released resource %d (available: %d)\n",
resource_id, pool.available_count);
// Notify waiting threads that a resource is available
notify_object(&resource_available);
return;
}
}
printf("Thread: Error - resource %d not found or already available!\n", resource_id);
}
// Worker thread that uses resources
void worker_thread(void* data) {
int worker_id = (int)data;
for (int i = 0; i < 5; i++) {
// Try to acquire a resource (2 second timeout)
int resource = acquire_resource(2000000);
if (resource < 0) {
printf("Worker %d: Failed to acquire resource\n", worker_id);
continue;
}
// Use the resource
printf("Worker %d: Using resource %d\n", worker_id, resource);
// Simulate work with the resource
for (volatile int j = 0; j < 500000; j++);
// Release the resource
release_resource(resource);
// Brief pause before next iteration
for (volatile int j = 0; j < 100000; j++);
}
printf("Worker %d: Finished\n", worker_id);
}
// Create multiple worker threads
OS_APPLICATION(resource_pool_example);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)1, 128);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)2, 128);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)3, 128);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)4, 128);
Key Concepts and Best Practices
1. Object Selection
- Any Address Works: You can use any variable’s address as a notification object
- Dedicated Objects: Use dedicated variables for clarity (like
producer_signal) - Multiple Objects: Use different objects for different types of events
2. Priority Handling
- Highest Priority First: Higher priority threads get notified first
- Single Notification: Only one thread is woken up per
notify_object()call - Fair Scheduling: Use thread priorities to ensure important threads get resources first
3. Timeout Usage
- Prevent Deadlocks: Always use timeouts to prevent threads from waiting forever
- Microsecond Resolution: Timeout values are in microseconds
- Zero Timeout: Use
0for infinite waiting (use with caution)
4. Common Patterns
Producer-Consumer:
// Producer
add_data_to_buffer();
notify_object(&data_available);
// Consumer
wait_for_object(&data_available, timeout);
process_data_from_buffer();
Resource Pool:
// Acquire
while (no_resources_available()) {
wait_for_object(&resource_free, timeout);
}
allocate_resource();
// Release
free_resource();
notify_object(&resource_free);
Event Coordination:
// Event setter
set_event_flag();
notify_object(&event_occurred);
// Event waiter
wait_for_object(&event_occurred, timeout);
handle_event();
Error Handling and Debugging
Return Value Checking
int result = wait_for_object(&my_object, 1000000);
if (result != 0) {
// Handle timeout or error
printf("Wait failed with code: %d\n", result);
}
Common Issues to Avoid
- Forgetting Timeouts: Always use reasonable timeouts
- Wrong Object: Make sure all threads use the same object address
- Missing Notifications: Ensure
notify_object()is called when conditions change - Race Conditions: Protect shared data structure modifications
Performance Considerations
- Minimal Overhead: Notification system has very low overhead
- No Memory Allocation: Uses existing variable addresses
- Priority Scheduling: Efficient priority-based wakeup
- Single Wakeup: Only one thread is woken per notification (efficient)
Next Steps
Try experimenting with these synchronization patterns:
- Build a multi-threaded sensor reading system
- Create a thread-safe logging system
- Implement a work queue with multiple worker threads
- Add synchronization to your existing RPC servers
- Create a thread-safe configuration system
This notification system provides the foundation for building complex, coordinated embedded applications where multiple threads work together efficiently and safely.