Class ParallelConsumerQueue<T>
A simple, threadsafe/blocking implementation of Queue for ordinary operations. Note: please read the remarks to understand
the differences between this class and the ordinary
Namespace: SuperOffice.Threading
Assembly: SoCore.dll
Syntax
public class ParallelConsumerQueue<T> : Queue<T> where T : class
Type Parameters
Name | Description |
---|---|
T |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
Constructors
ParallelConsumerQueue()
Initialize a queue with no upper limit on the number of elements
Declaration
public ParallelConsumerQueue()
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
ParallelConsumerQueue(Int32)
Initialize a queue with a maximum capacity; attempts to add more elements will block until capacity becomes available
Declaration
public ParallelConsumerQueue(int maxCapacity)
Parameters
Type | Name | Description |
---|---|---|
Int32 | maxCapacity | Maximum capacity; queue is also preallocated to this size so insertions are O(1) |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
Properties
Count
Return the number of items in the queue
Declaration
public int Count { get; }
Property Value
Type | Description |
---|---|
Int32 |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
Methods
Consume(Int32, Action<T>)
Factory method for a context object, suitable for using syntax
Declaration
public ParallelConsumerQueue<T>.ConsumerContext Consume(int threadCount, Action<T> consumerAction)
Parameters
Type | Name | Description |
---|---|---|
Int32 | threadCount | |
Action<T> | consumerAction |
Returns
Type | Description |
---|---|
ParallelConsumerQueue.ConsumerContext<> |
Remarks
An alternative to manually calling StartConsumers(Int32, Action<T>) and StopConsumers() is to use this method
to obtain a disposable context class. Entering the using clause starts the consumers, and exiting it will cause the
main thread to block until all the work items in the queue have been consumed; end marks are pushed and the consumer
threads terminate. The main thread will the continue past the end of the using scope.
ParallelConsumerQueue>BulkImportInfo< queue = new ParallelConsumerQueue>BulkImportInfo<(5);
// start the consumer threads, and then iterate over incoming data, queuing up batches
using (queue.Consume(NumShipoutThreads, WriteBulkRows))
{
// pull data from the enumerable
DataTable batchBuffer = null;
foreach (object[] row in rows)
{
...
// if the batch is full, then ship it via the queue (blocks if the queue is "full")
if (rowsInBatch == ShipoutBatchSize)
{
queue.Enqueue(new BulkImportInfo() { Data = batchBuffer, Table = table });
rowsInBatch = 0;
}
}
}
...
protected override void WriteBulkRows(BulkImportInfo importInfo)
{
using (SqlBulkCopy bcp = new SqlBulkCopy(rawConn))
{
bcp.DestinationTableName = SoDatabase.GetCurrent().TablePrefix + "." + importInfo.Table.DbName;
bcp.WriteToServer(importInfo.Data);
}
}
Dequeue()
Wait until semaphore signals items available, then lock and dequeue an item
Declaration
public T Dequeue()
Returns
Type | Description |
---|---|
T | Oldest element in queue, which is removed from it |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
Enqueue(T)
Lock the queue, add an element, and signal the internal semaphore
Declaration
public void Enqueue(T item)
Parameters
Type | Name | Description |
---|---|---|
T | item | Item to add to queue |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
Peek()
Return the first element of the queue if available, otherwise throw an exception like the base class
Declaration
public T Peek()
Returns
Type | Description |
---|---|
T | Oldest element in queue, but does not modify queue contents |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
StartConsumers(Int32, Action<T>)
Start n simultaneous consumer threads, each running the same code. The consumer methods have to take a single parameter, a queue item, and are called whenever there is something on the queue (items are Dequeue()'d and the consumer method invoked once per item). null items cannot be used as they signal end-of-work on the queue
Declaration
public AsyncContext[] StartConsumers(int numberOfThreads, Action<T> consumerAction)
Parameters
Type | Name | Description |
---|---|---|
Int32 | numberOfThreads | Number of consumer threads to run |
Action<T> | consumerAction | Method to be invoked per item to be consumed - the method should
not call |
Returns
Type | Description |
---|---|
AsyncContext[] |
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe
StopConsumers()
Push a null (end mark) on the queue, for each known consumer thread
Declaration
public void StopConsumers()
Remarks
This class has slightly different behaviour, as follows:
- Enqueue, Dequeue and Count are thread-safe
- If the queue is empty, Dequeue will block instead of throwing an exception
- If the queue is "full" (maxCapacity specified in constructor has been reached), then Enqueue will block
- Peek will throw an exception if called while the queue is empty
- Enumeration, and thus all kinds of LINQ operations, are not threadsafe