Priority Queue with a Twist

Recently I needed a variation of a Priority Queue in C# that supported concurrency.

My requirement for a Priority Queue

  • I would receive a lot of jobs in a random order
  • A job would contain a JobNumber indicating in which order they should be executed
  • The first JobNumber would be zero
  • All JobNumbers would be present (e.g. number 7 would not be missing)
  • Several worker threads would¬†simultaneously¬†be retrieving jobs from the queue, hence the need for concurrency

The Solution

For the clarity of the example, I have simplified the Job class to look like:

public class Job
{
	public int jobNumber;
}

My Concurrent Priority Queue would look like:

class ConPQueue
{
	private const int TIMEOUT = 10000; // 10 seconds
	private SortedList<int, Job> list;
	private int nextJobNumber;
	private static EventWaitHandle waitHandle = new AutoResetEvent(false);
	private Boolean completeAdding = false;

	public ConPQueue()
	{
		this.list = new SortedList<int, Job>();
		this.nextJobNumber = 0;
	}

	public Boolean add(Job job)
	{
		Boolean status = false;
		if (Monitor.TryEnter(this.list, TIMEOUT))
		{
			try
			{
				this.list.Add(job.jobNumber, job);
				// Notify the take() method that new values are present
				waitHandle.Set();
			}
			finally
			{
				Monitor.Exit(this.list);
			}
		}
		return status;
	}

	public Job take()
	{
		Job job = null;
		while (job == null)
		{
			// If next job is not present, wait until notified
			// that new jobs have been added to the list
			while (!hasKey(this.nextJobNumber))
			{
				waitHandle.WaitOne();
			}
			if (Monitor.TryEnter(this.list, TIMEOUT))
			{
				try
				{
					this.list.TryGetValue(this.nextJobNumber, out job);
					if (job != null)
					{
						// Only remove item on successfull retrieval
						this.list.Remove(this.nextJobNumber++);
					}
				}
				finally
				{
					Monitor.Exit(this.list);
				}
			}
		}
		return job;
	}

	public Boolean hasKey(int Key)
	{
		Boolean exists = false;
		if (Monitor.TryEnter(this.list, TIMEOUT))
		{
			try
			{
				exists = this.list.ContainsKey(Key);
			}
			finally
			{
				Monitor.Exit(this.list);
			}
		}
		return exists;
	}

	public void addingComplete()
	{
		completeAdding = true;
	}

	public Boolean isCompleted
	{
		get { return completeAdding && list.Count == 0; }
	}
}

To add a Job to the Queue:

queue.take();

To retrieve a Job from the Queue:

queue.add(myJob);

Hope this can inspire fellow developers :-)