Personal Programming Notes

To err is human; to debug, divine.

Improved Priority Queue Recipe in Python

A priority queue is a commonly used abstract data type, but it is not adequately provided in Python’s standard library.

The module Queue provides a PriorityQueue class but that implementation leaves a lot to be desired. It does not provide standard peek or remove methods in its public interface, which is sometimes critical. Additionally, the entry must be in the tuple form (priority_number, data) where lower number must be used for higher priority task to be returned first. Finally, this Queue version is reportedly slower because it adds locks and encapsulation designed for multi-threaded environment, which is arguably the intention of that module.

On the other hand, the module heapq provides an implementation of binary heap algorithms, which is the most common data structure for implementing priority-queue. Although the module does not provide any direct implementation of priority-queue, its documentation discusses how to add additional capabilities to a heap-based priority queue and provides a recipe as an example. That example is still hard to be used directly since it is not encapsulated into a class and the standard peek method is noticeably missing.

I ended up implementing a wrapper class for that recipe to make it easier to use.

Improved priority-queue recipe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import heapq
import itertools

class PriorityQueue(object):

    _REMOVED = "<REMOVED>"

    def __init__(self):
        self.heap = []
        self.entries = {}
        self.counter = itertools.count()

    def add(self, task, priority=0):
        """Add a new task or update the priority of an existing task"""
        if task in self.entries:
            self.remove(task)

        count = next(self.counter)
        # weight = -priority since heap is a min-heap
        entry = [-priority, count, task]
        self.entries[task] = entry
        heapq.heappush(self.heap, entry)
        pass

    def remove(self, task):
        """ Mark the given task as REMOVED.

        Do this to avoid breaking heap-invariance of the internal heap.
        """
        entry = self.entries[task]
        entry[-1] = PriorityQueue._REMOVED
        pass

    def pop(self):
        """ Get task with highest priority.

        :return: Priority, Task with highest priority
        """
        while self.heap:
            weight, count, task = heapq.heappop(self.heap)
            if task is not PriorityQueue._REMOVED:
                del self.entries[task]
                return -weight, task
        raise KeyError("The priority queue is empty")

    def peek(self):
        """ Check task with highest priority, without removing.

        :return: Priority, Task with highest priority
        """
        while self.heap:
            weight, count, task = self.heap[0]
            if task is PriorityQueue._REMOVED:
                heapq.heappop(self.heap)
            else:
                return -weight, task

        return None

    def __str__(self):
        temp = [str(e) for e in self.heap if e[-1] is not PriorityQueue._REMOVED]
        return "[%s]" % ", ".join(temp)

Comparing to the recipe provided in heapq module, a few notes about this implementation:

  • Task with higher priority goes out first. A simple change will remove lots of confusion (and bugs) associated with min-heap implementations.
  • Methods and supporting data structures are encapsulated into a class.
  • Method names are simplified to add, remove, pop (instead of add_task, for example) since priority queues are NOT only used for task scheduling.
  • Method peek is added.
  • Method pop and peek return the highest-priority task together with its priority number. The task’s priority number can be useful sometimes (see Skyline problem below).
  • Override __str__ method for pretty printing.

As an example, the above priority-queue implementation is used to solve the Skyline problem. The Skyline problem states that:

You are given a set of n rectangular buildings on a skyline. Find the outline around that set of rectangles, which is the skyline when silhouetted at night.

An image of example input and output

One possible approach is to use a priority queue to keep track of the current highest building while moving from left to right and adding/removing buildings at key points (i.e., start and end of buildings). Compared to the Merge-Sort-like approach detailed in this link, this approach is much more intuitive in my opinion while having similar runtime complexity $\mathcal{O}(n\log{}n)$.

Solution to Skyline problem
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def solve_skyline(mlist):
    """ Solve the Skyline problem.

    :param mlist: list of buildings in format (start, end, height).
    :return: List of end points
    """

    skyline = []
    cur_height = 0
    pq = PriorityQueue()
    events = defaultdict(list)
    START = "start"
    END = "end"

    for idx, building in enumerate(mlist):
        start, end, height = building
        events[start].append((idx, START))
        events[end].append((idx, END))

    # k_events is the ordered list of x-coordinates where buildings start or end (events)
    k_events = sorted(events.keys())

    # Add and remove buildings into a priority-queue for each event.
    for key in k_events:
        # print skyline
        buildings = events[key]

        for e in buildings:
            idx, label = e
            if label == START:
                pq.add(idx, mlist[idx][2])
            elif label == END:
                pq.remove(idx)

        # after processing all buildings for a x-coordinate "key", check the current highest building
        temp = pq.peek()
        new_height = 0
        if temp is not None:
            new_height = temp[0]
        if new_height != cur_height:
            skyline.append((key, new_height))
            cur_height = new_height

    return skyline