Show / Hide Table of Contents

Class ParallelConsumerQueue<T>

A simple, threadsafe and blocking queue. Contains methods to easily have multiple, identical consumer threads.

Inheritance
object
ParallelConsumerQueue<T>
Inherited Members
object.ToString()
object.Equals(object)
object.Equals(object, object)
object.ReferenceEquals(object, object)
object.GetHashCode()
object.GetType()
object.MemberwiseClone()
Namespace: SuperOfficeThreading
Assembly: SoCore.dll
Syntax
public class ParallelConsumerQueue<T> where T : class
Type Parameters
Name Description
T
Remarks
  • Enqueue, Dequeue are thread-safe
  • If the queue is empty, Dequeue will block instead of throwing an exception
  • If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
The purpose of this class is to be used in a producer/consumer scenario, where the producer thread adds to the queue while the consumer thread feeds off the queue. The producer thread should block if the queue becomes too large, while the consumer thread should block until work becomes available.

There is no explicit method to signal "end-of-processing" to the consumer, but pushing NULL on the queue as an end marker is a simple way to do it - generally valid queue items will not be null.

The StartConsumers(int, Func<T, CancellationToken, Task>, CancellationToken) and WaitForConsumers() methods are used if you have a consumer method to be run in its own thread, and you want one or more such to be easily started and stopped.

Constructors

ParallelConsumerQueue(int)

Initialize a queue with a maximum capacity; attempts to add more elements will block until capacity becomes available

Declaration
public ParallelConsumerQueue(int maxCapacity)
Parameters
Type Name Description
int maxCapacity

Maximum capacity until enqueue blocks

Methods

Enqueue(T)

Add an element, and signal the internal semaphore

Declaration
public void Enqueue(T item)
Parameters
Type Name Description
T item

Item to add to queue

Exceptions
Type Condition
InvalidOperationException

Thrown if Enqueue is called after termination has started.

StartConsumers(int, Func<T, CancellationToken, Task>, CancellationToken)

Start n simultaneous consumer threads, each running the same code. The consumer methods have to take a single parameter, a queue item, and are called whenever there is something on the queue (items are SuperOffice.Threading.ParallelConsumerQueue`1.Dequeue'd and the consumer method invoked once per item). null items cannot be used as they signal end-of-work on the queue

Declaration
public void StartConsumers(int numberOfParallelTasks, Func<T, CancellationToken, Task> consumerActionAsync, CancellationToken cancellationToken = default)
Parameters
Type Name Description
int numberOfParallelTasks

Number of consumer threads to run

FuncCancellationTokenTask consumerActionAsync

Method to be invoked per item to be consumed - the method should not call SuperOffice.Threading.ParallelConsumerQueue`1.Dequeue as that has already been done

CancellationToken cancellationToken

WaitForConsumers()

Push a null (end mark) on the queue, for each known consumer task. This will cause the consumer tasks to finish

Declaration
public Task WaitForConsumers()
Returns
Type Description
Task
Remarks

It is not permitted to Enqueue(T) more items to the queue after calling this method.

Extension Methods

EnumUtil.MapEnums<From, To>(From)
Converters.MapEnums<From, To>(From)
© SuperOffice. All rights reserved.
SuperOffice |  Community |  Release Notes |  Privacy |  Site feedback |  Search Docs |  About Docs |  Contribute |  Back to top