Class ParallelConsumerQueue<T>
A simple, threadsafe and blocking queue. Contains methods to easily have multiple, identical consumer threads.
Inherited Members
Namespace: SuperOfficeThreading
Assembly: SoCore.dll
Syntax
public class ParallelConsumerQueue<T> where T : class
Type Parameters
| Name | Description |
|---|---|
| T |
Remarks
- Enqueue, Dequeue are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
There is no explicit method to signal "end-of-processing" to the consumer, but pushing NULL on the queue as an end marker is a simple way to do it - generally valid queue items will not be null.
The StartConsumers(int, Func<T, CancellationToken, Task>, CancellationToken) and WaitForConsumers() methods are used if you have a consumer method to be run in its own thread, and you want one or more such to be easily started and stopped.
Constructors
ParallelConsumerQueue(int)
Initialize a queue with a maximum capacity; attempts to add more elements will block until capacity becomes available
Declaration
public ParallelConsumerQueue(int maxCapacity)
Parameters
| Type | Name | Description |
|---|---|---|
| int | maxCapacity | Maximum capacity until enqueue blocks |
Methods
Enqueue(T)
Add an element, and signal the internal semaphore
Declaration
public void Enqueue(T item)
Parameters
| Type | Name | Description |
|---|---|---|
| T | item | Item to add to queue |
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Thrown if Enqueue is called after termination has started. |
StartConsumers(int, Func<T, CancellationToken, Task>, CancellationToken)
Start n simultaneous consumer threads, each running the same code. The consumer methods have to take a single parameter, a queue item, and are called whenever there is something on the queue (items are SuperOffice.Threading.ParallelConsumerQueue`1.Dequeue'd and the consumer method invoked once per item). null items cannot be used as they signal end-of-work on the queue
Declaration
public void StartConsumers(int numberOfParallelTasks, Func<T, CancellationToken, Task> consumerActionAsync, CancellationToken cancellationToken = default)
Parameters
| Type | Name | Description |
|---|---|---|
| int | numberOfParallelTasks | Number of consumer threads to run |
| FuncCancellationTokenTask | consumerActionAsync | Method to be invoked per item to be consumed - the method should not call SuperOffice.Threading.ParallelConsumerQueue`1.Dequeue as that has already been done |
| CancellationToken | cancellationToken |
WaitForConsumers()
Push a null (end mark) on the queue, for each known consumer task. This will cause the consumer tasks to finish
Declaration
public Task WaitForConsumers()
Returns
| Type | Description |
|---|---|
| Task |
Remarks
It is not permitted to Enqueue(T) more items to the queue after calling this method.