Now that you understand applications, RPC communication, and drivers, let’s explore how to coordinate multiple threads working together. This example shows how to use CMRX’s notification system to synchronize threads safely and efficiently.
wait_for_object() and notify_object() for synchronizationIn embedded systems, you often have multiple threads that need to coordinate their work:
Without proper synchronization, you get race conditions - unpredictable behavior when threads access shared resources simultaneously.
CMRX provides a simple but powerful notification system based on object addresses:
// Wait for notification on an object
int wait_for_object(void* object, unsigned int timeout);
// Notify one waiting thread
void notify_object(void* object);
notify_object() callLet’s create a simple producer-consumer example where one thread generates data and another processes it:
#include <cmrx/application.h>
#include <cmrx/ipc/notify.h>
#include <stdio.h>
// Shared data structure
struct DataBuffer {
int data[10];
int write_index;
int read_index;
int count;
bool running;
};
// Our shared buffer instance
static struct DataBuffer buffer = {
.write_index = 0,
.read_index = 0,
.count = 0,
.running = true
};
// We'll use these addresses as notification objects
static int producer_signal = 0; // Signal when data is available
static int consumer_signal = 0; // Signal when space is available
The producer generates data and notifies the consumer:
void producer_thread(void* data) {
int value = 1;
while (buffer.running) {
// Wait if buffer is full
while (buffer.count >= 10) {
printf("Producer: Buffer full, waiting for space...\n");
// Wait for consumer to make space (with 5 second timeout)
int result = wait_for_object(&consumer_signal, 5000000);
if (result != 0) {
printf("Producer: Timeout waiting for space!\n");
continue;
}
}
// Add data to buffer
buffer.data[buffer.write_index] = value;
buffer.write_index = (buffer.write_index + 1) % 10;
buffer.count++;
printf("Producer: Added %d to buffer (count: %d)\n", value, buffer.count);
value++;
// Notify consumer that data is available
notify_object(&producer_signal);
// Simulate work
for (volatile int i = 0; i < 100000; i++);
}
}
The consumer waits for data and processes it:
void consumer_thread(void* data) {
while (buffer.running) {
// Wait if buffer is empty
while (buffer.count == 0) {
printf("Consumer: Buffer empty, waiting for data...\n");
// Wait for producer to add data (with 3 second timeout)
int result = wait_for_object(&producer_signal, 3000000);
if (result != 0) {
printf("Consumer: Timeout waiting for data!\n");
continue;
}
}
// Remove data from buffer
int value = buffer.data[buffer.read_index];
buffer.read_index = (buffer.read_index + 1) % 10;
buffer.count--;
printf("Consumer: Processed %d from buffer (count: %d)\n", value, buffer.count);
// Notify producer that space is available
notify_object(&consumer_signal);
// Simulate processing work
for (volatile int i = 0; i < 200000; i++);
}
}
A third thread that can stop the system:
void control_thread(void* data) {
// Let the system run for a while
for (volatile int i = 0; i < 10000000; i++);
printf("Control: Shutting down system...\n");
buffer.running = false;
// Wake up any waiting threads so they can exit
notify_object(&producer_signal);
notify_object(&consumer_signal);
}
// Create the application with all threads
OS_APPLICATION(sync_example);
// Create threads with different priorities
OS_THREAD_CREATE_PRIORITY(sync_example, producer_thread, NULL, 128, 10);
OS_THREAD_CREATE_PRIORITY(sync_example, consumer_thread, NULL, 128, 10);
OS_THREAD_CREATE_PRIORITY(sync_example, control_thread, NULL, 64, 5);
Here’s a more sophisticated example that manages a pool of shared resources:
#include <cmrx/application.h>
#include <cmrx/ipc/notify.h>
#define MAX_RESOURCES 3
// Resource pool structure
struct ResourcePool {
int resources[MAX_RESOURCES];
bool available[MAX_RESOURCES];
int available_count;
};
static struct ResourcePool pool = {
.resources = {100, 200, 300}, // Resource IDs
.available = {true, true, true},
.available_count = MAX_RESOURCES
};
// Notification object for resource availability
static int resource_available = 0;
// Acquire a resource from the pool
int acquire_resource(unsigned int timeout_us) {
while (pool.available_count == 0) {
printf("Thread: No resources available, waiting...\n");
// Wait for a resource to become available
int result = wait_for_object(&resource_available, timeout_us);
if (result != 0) {
printf("Thread: Timeout acquiring resource!\n");
return -1; // Timeout
}
}
// Find and allocate first available resource
for (int i = 0; i < MAX_RESOURCES; i++) {
if (pool.available[i]) {
pool.available[i] = false;
pool.available_count--;
printf("Thread: Acquired resource %d (remaining: %d)\n",
pool.resources[i], pool.available_count);
return pool.resources[i];
}
}
return -1; // Should never reach here
}
// Release a resource back to the pool
void release_resource(int resource_id) {
for (int i = 0; i < MAX_RESOURCES; i++) {
if (pool.resources[i] == resource_id && !pool.available[i]) {
pool.available[i] = true;
pool.available_count++;
printf("Thread: Released resource %d (available: %d)\n",
resource_id, pool.available_count);
// Notify waiting threads that a resource is available
notify_object(&resource_available);
return;
}
}
printf("Thread: Error - resource %d not found or already available!\n", resource_id);
}
// Worker thread that uses resources
void worker_thread(void* data) {
int worker_id = (int)data;
for (int i = 0; i < 5; i++) {
// Try to acquire a resource (2 second timeout)
int resource = acquire_resource(2000000);
if (resource < 0) {
printf("Worker %d: Failed to acquire resource\n", worker_id);
continue;
}
// Use the resource
printf("Worker %d: Using resource %d\n", worker_id, resource);
// Simulate work with the resource
for (volatile int j = 0; j < 500000; j++);
// Release the resource
release_resource(resource);
// Brief pause before next iteration
for (volatile int j = 0; j < 100000; j++);
}
printf("Worker %d: Finished\n", worker_id);
}
// Create multiple worker threads
OS_APPLICATION(resource_pool_example);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)1, 128);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)2, 128);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)3, 128);
OS_THREAD_CREATE(resource_pool_example, worker_thread, (void*)4, 128);
producer_signal)notify_object() call0 for infinite waiting (use with caution)Producer-Consumer:
// Producer
add_data_to_buffer();
notify_object(&data_available);
// Consumer
wait_for_object(&data_available, timeout);
process_data_from_buffer();
Resource Pool:
// Acquire
while (no_resources_available()) {
wait_for_object(&resource_free, timeout);
}
allocate_resource();
// Release
free_resource();
notify_object(&resource_free);
Event Coordination:
// Event setter
set_event_flag();
notify_object(&event_occurred);
// Event waiter
wait_for_object(&event_occurred, timeout);
handle_event();
int result = wait_for_object(&my_object, 1000000);
if (result != 0) {
// Handle timeout or error
printf("Wait failed with code: %d\n", result);
}
notify_object() is called when conditions changeTry experimenting with these synchronization patterns:
This notification system provides the foundation for building complex, coordinated embedded applications where multiple threads work together efficiently and safely.