In embedded systems, different parts of your application often need to exchange data asynchronously. For example, an interrupt handler might collect sensor readings that a main thread processes later, or multiple threads might need to share work items. While the interrupt handler may use statically allocated buffer and isr_notify_object() call, this is only feasible for use-cases where queueing is not needed. While CMRX kernel doesn’t provide any queueing system calls, there is queue library available which provides this functionality implemented fully in userspace.
What are Queues? Queues are FIFO (First In, First Out) data structures that allow one part of your program to send data to another part asynchronously. Think of them like a conveyor belt - items go in one end and come out the other in the same order.
CMRX offers two different queue implementations for different use cases. Regardless of which implementation is used, queue support has to be added into project in the build system by adding following call:
target_add_applications(<firmware_name> queue_server)
This will instruct CMake to link the implementation of queue library and queue server to the application. <firmware_name> stands for the name of your firmware target created by add_executable() call.
The queue implementation used in CMRX is lock-free. It is thread safe in single-producer, single-consumer case. In all other cases the access to queue has to be protected by using mutexes.
Local queue library will find use in cases where multiple threads running in same process context need to synchronize data processing. This often happens when RPC call is executed which needs to pass data to the main server thread. RPC call executes in the context of the server process and thus while the thread itself belongs to client process, during RPC call it is executed in server process context and can access server’s memory. Here plain userspace queue can be used to synchronize the data.
Queues are allocated in process address space and thus are not visible from outside of the process. This means that they can’t be used to synchronize data accross processes. If this is needed then the following mechanism can be used.
Queue server is RPC service that provides API to access queues from multiple processes. It works by creating queues inside queue server process address space and providing access to these queues via RPC API.
This example focuses on the local queue library, which is most commonly used for driver and interrupt scenarios. As RPC calls are the major way of interfacing between processes in CMRX ecosystem, most use cases will suffice with plain queue library use and queue server will rarely be needed.
Let’s start with a simple producer-consumer example where one thread generates work items and another thread processes them.
#include <cmrx/application.h>
#include <extra/queue_server/queue.h>
// Define what data we'll store in the queue
struct WorkItem {
int task_id;
int priority;
char description[32];
};
// Create a queue that can hold 16 work items (calculate total size needed)
#define WORK_QUEUE_SIZE (16 * sizeof(struct WorkItem))
STATIC_QUEUE_T(WorkQueue, WORK_QUEUE_SIZE);
static WorkQueue work_queue;
// Global variables for coordination
static volatile bool system_running = true;
static int next_task_id = 1;
What This Does:
STATIC_QUEUE_T creates a custom queue type for our specific datavolatile keyword ensures the compiler doesn’t optimize away the running flagvoid work_system_init(void) {
// Initialize the queue
if (!queue_init(&work_queue, 16, sizeof(struct WorkItem))) {
printf("Failed to initialize work queue\n");
return;
};
printf("Work processing system initialized\n");
system_running = true;
}
// Work generator thread - this is the "producer"
void work_generator_thread(void * data) {
while (system_running) {
// Create a new work item
struct WorkItem item = {
.task_id = next_task_id++,
.priority = (next_task_id % 3) + 1, // Priority 1-3
.description = {0}
};
// Create description based on task type
if (item.priority == 1) {
strcpy(item.description, "Low priority cleanup");
} else if (item.priority == 2) {
strcpy(item.description, "Normal data processing");
} else {
strcpy(item.description, "High priority alert");
}
// Try to add it to the queue
if (!queue_full(&work_queue)) {
printf("Generated: Task %d (%s)\n", item.task_id, item.description);
queue_send(&work_queue, &item);
} else {
printf("Queue full! Dropping task %d\n", item.task_id);
}
// Generate work every 500ms
usleep(500000);
}
}
Key Points:
queue_send() which will block if queue becomes full// Work processor thread - this is the "consumer"
void work_processor_thread(void * data) {
struct WorkItem item;
while (system_running) {
// Wait for work to become available (blocking call)
if (queue_receive(&work_queue, &item)) {
printf("Processing: Task %d (Priority %d) - %s\n",
item.task_id, item.priority, item.description);
// Simulate processing time based on priority
int processing_time = (4 - item.priority) * 200000; // High priority = less time (microseconds)
usleep(processing_time);
printf("Completed: Task %d\n", item.task_id);
}
}
}
What This Does:
queue_receive() blocks until work is available// Create the application with both threads
OS_APPLICATION(work_app);
OS_THREAD_CREATE(work_app, work_generator_thread, NULL, 2); // Priority 2
OS_THREAD_CREATE(work_app, work_processor_thread, NULL, 1); // Priority 1 (higher)
Despite queue library is a userspace library it can be used from interrupt context too. This is useful in cases where interrupt handler needs to pass data for later processing which may come in bursts. If this happens then it is desirable to queue the data for processing to avoid data loss. The implementation of queue_send() notifies any potential waiting threads using notify_object() call. While this call is not possible from interrupt context, interrupts need to use queue_sent_silent() instead.
The only difference between queue_send() and queue_send_silent() is that latter doesn’t perform any notification. Thus interrupt handler needs to perform explicit notification using call to isr_notify_object() notifying the queue object:
void ISRHandler() {
/* ... */
queue_send_silent(&queue, &data);
isr_notify_object(&queue);
/* ... */
}
If you need communication between different applications, use the queue server:
#include <cmrx/aux/queue_server.h>
#include <cmrx/ipc/rpc.h>
// Create a cross-process queue
queue_id_t shared_queue;
rpc_call(queue_server, create, &shared_queue, sizeof(struct MyData), 32);
// Use it from any application
struct MyData data = {42, "Hello"};
rpc_call(queue_server, send, shared_queue, &data);
Queues are fundamental building blocks for robust embedded systems, providing clean separation between data collection, processing, and output while maintaining system responsiveness.