Recently I needed a variation of a Priority Queue in C# that supported concurrency.
My requirement for a Priority Queue
- I would receive a lot of jobs in a random order
- A job would contain a JobNumber indicating in which order they should be executed
- The first JobNumber would be zero
- All JobNumbers would be present (e.g. number 7 would not be missing)
- Several worker threads would simultaneously be retrieving jobs from the queue, hence the need for concurrency
The Solution
For the clarity of the example, I have simplified the Job class to look like:
public class Job { public int jobNumber; }
My Concurrent Priority Queue would look like:
class ConPQueue { private const int TIMEOUT = 10000; // 10 seconds private SortedList<int, Job> list; private int nextJobNumber; private static EventWaitHandle waitHandle = new AutoResetEvent(false); private Boolean completeAdding = false; public ConPQueue() { this.list = new SortedList<int, Job>(); this.nextJobNumber = 0; } public Boolean add(Job job) { Boolean status = false; if (Monitor.TryEnter(this.list, TIMEOUT)) { try { this.list.Add(job.jobNumber, job); // Notify the take() method that new values are present waitHandle.Set(); } finally { Monitor.Exit(this.list); } } return status; } public Job take() { Job job = null; while (job == null) { // If next job is not present, wait until notified // that new jobs have been added to the list while (!hasKey(this.nextJobNumber)) { waitHandle.WaitOne(); } if (Monitor.TryEnter(this.list, TIMEOUT)) { try { this.list.TryGetValue(this.nextJobNumber, out job); if (job != null) { // Only remove item on successfull retrieval this.list.Remove(this.nextJobNumber++); } } finally { Monitor.Exit(this.list); } } } return job; } public Boolean hasKey(int Key) { Boolean exists = false; if (Monitor.TryEnter(this.list, TIMEOUT)) { try { exists = this.list.ContainsKey(Key); } finally { Monitor.Exit(this.list); } } return exists; } public void addingComplete() { completeAdding = true; } public Boolean isCompleted { get { return completeAdding && list.Count == 0; } } }
To add a Job to the Queue:
queue.take();
To retrieve a Job from the Queue:
queue.add(myJob);
Hope this can inspire fellow developers