Queues
In embedded systems, different parts of your application often need to exchange data asynchronously. For example, an interrupt handler might collect sensor readings that a main thread processes later, or multiple threads might need to share work items. While the interrupt handler may use statically allocated buffer and isr_notify_object() call, this is only feasible for use-cases where queueing is not needed. While CMRX kernel doesn’t provide any queueing system calls, there is queue library available which provides this functionality implemented fully in userspace.
What You’ll Learn
- How to use intra-process queues for thread and interrupt communication
- When to use local queues vs cross-process queue server
- Producer-consumer patterns in embedded systems
- Safe data exchange between interrupts and threads
What are Queues? Queues are FIFO (First In, First Out) data structures that allow one part of your program to send data to another part asynchronously. Think of them like a conveyor belt - items go in one end and come out the other in the same order.
Understanding CMRX Queue Options
CMRX offers two different queue implementations for different use cases. Regardless of which implementation is used, queue support has to be added into project in the build system by adding following call:
target_add_applications(<firmware_name> queue_server)
This will instruct CMake to link the implementation of queue library and queue server to the application. <firmware_name> stands for the name of your firmware target created by add_executable() call.
The queue implementation used in CMRX is lock-free. It is thread safe in single-producer, single-consumer case. In all other cases the access to queue has to be protected by using mutexes.
Local Queue Library
- Scope: Single process only (threads + interrupts within one application)
- Performance: Very fast, no RPC overhead
- Use case: Interrupt-to-thread communication, producer-consumer within one process
- Memory isolation: Cannot cross process boundaries
Local queue library will find use in cases where multiple threads running in same process context need to synchronize data processing. This often happens when RPC call is executed which needs to pass data to the main server thread. RPC call executes in the context of the server process and thus while the thread itself belongs to client process, during RPC call it is executed in server process context and can access server’s memory. Here plain userspace queue can be used to synchronize the data.
Queues are allocated in process address space and thus are not visible from outside of the process. This means that they can’t be used to synchronize data accross processes. If this is needed then the following mechanism can be used.
Queue Server
- Scope: Cross-process communication
- Performance: Slower due to RPC calls
- Use case: Communication between different applications
- Memory isolation: Crosses process boundaries safely
Queue server is RPC service that provides API to access queues from multiple processes. It works by creating queues inside queue server process address space and providing access to these queues via RPC API.
This example focuses on the local queue library, which is most commonly used for driver and interrupt scenarios. As RPC calls are the major way of interfacing between processes in CMRX ecosystem, most use cases will suffice with plain queue library use and queue server will rarely be needed.
Basic Queue Usage Example
Let’s start with a simple producer-consumer example where one thread generates work items and another thread processes them.
Step 1: Include Queue Library and Set Up Data Structures
#include <cmrx/application.h>
#include <extra/queue_server/queue.h>
// Define what data we'll store in the queue
struct WorkItem {
int task_id;
int priority;
char description[32];
};
// Create a queue that can hold 16 work items (calculate total size needed)
#define WORK_QUEUE_SIZE (16 * sizeof(struct WorkItem))
STATIC_QUEUE_T(WorkQueue, WORK_QUEUE_SIZE);
static WorkQueue work_queue;
// Global variables for coordination
static volatile bool system_running = true;
static int next_task_id = 1;
What This Does:
STATIC_QUEUE_Tcreates a custom queue type for our specific data- The queue can hold 16 work items before becoming full
volatilekeyword ensures the compiler doesn’t optimize away the running flag
Step 2: Initialize the Queue
void work_system_init(void) {
// Initialize the queue
if (!queue_init(&work_queue, 16, sizeof(struct WorkItem))) {
printf("Failed to initialize work queue\n");
return;
};
printf("Work processing system initialized\n");
system_running = true;
}
Step 3: Producer Thread - Creates Work
// Work generator thread - this is the "producer"
void work_generator_thread(void * data) {
while (system_running) {
// Create a new work item
struct WorkItem item = {
.task_id = next_task_id++,
.priority = (next_task_id % 3) + 1, // Priority 1-3
.description = {0}
};
// Create description based on task type
if (item.priority == 1) {
strcpy(item.description, "Low priority cleanup");
} else if (item.priority == 2) {
strcpy(item.description, "Normal data processing");
} else {
strcpy(item.description, "High priority alert");
}
// Try to add it to the queue
if (!queue_full(&work_queue)) {
printf("Generated: Task %d (%s)\n", item.task_id, item.description);
queue_send(&work_queue, &item);
} else {
printf("Queue full! Dropping task %d\n", item.task_id);
}
// Generate work every 500ms
usleep(500000);
}
}
Key Points:
- Generates work items with different priorities
- Uses
queue_send()which will block if queue becomes full - Checks queue status to handle overflow gracefully
Step 4: Consumer Thread - Processes Work
// Work processor thread - this is the "consumer"
void work_processor_thread(void * data) {
struct WorkItem item;
while (system_running) {
// Wait for work to become available (blocking call)
if (queue_receive(&work_queue, &item)) {
printf("Processing: Task %d (Priority %d) - %s\n",
item.task_id, item.priority, item.description);
// Simulate processing time based on priority
int processing_time = (4 - item.priority) * 200000; // High priority = less time (microseconds)
usleep(processing_time);
printf("Completed: Task %d\n", item.task_id);
}
}
}
What This Does:
queue_receive()blocks until work is available- Processes items in FIFO order
- Simulates different processing times based on priority
Step 5: Create the Application
// Create the application with both threads
OS_APPLICATION(work_app);
OS_THREAD_CREATE(work_app, work_generator_thread, NULL, 2); // Priority 2
OS_THREAD_CREATE(work_app, work_processor_thread, NULL, 1); // Priority 1 (higher)
Interrupts and Queues
Despite queue library is a userspace library it can be used from interrupt context too. This is useful in cases where interrupt handler needs to pass data for later processing which may come in bursts. If this happens then it is desirable to queue the data for processing to avoid data loss. The implementation of queue_send() notifies any potential waiting threads using notify_object() call. While this call is not possible from interrupt context, interrupts need to use queue_sent_silent() instead.
The only difference between queue_send() and queue_send_silent() is that latter doesn’t perform any notification. Thus interrupt handler needs to perform explicit notification using call to isr_notify_object() notifying the queue object:
void ISRHandler() {
/* ... */
queue_send_silent(&queue, &data);
isr_notify_object(&queue);
/* ... */
}
Performance Considerations
- Local queues are faster than cross-process queue server
- Larger queues reduce blocking but use more memory
- Multiple small queues often better than one large queue
When to Use Cross-Process Queues
If you need communication between different applications, use the queue server:
#include <cmrx/aux/queue_server.h>
#include <cmrx/ipc/rpc.h>
// Create a cross-process queue
queue_id_t shared_queue;
rpc_call(queue_server, create, &shared_queue, sizeof(struct MyData), 32);
// Use it from any application
struct MyData data = {42, "Hello"};
rpc_call(queue_server, send, shared_queue, &data);
Queues are fundamental building blocks for robust embedded systems, providing clean separation between data collection, processing, and output while maintaining system responsiveness.