剑指offer 面试题41. 数据流中的中位数(难)& LeetCode 295. 数据流的中位数(难)

由于数据是从一个数据流中读出来的,因而数据的数目随着时间的变化而增加。如果用一个数据容器来保存从流中读出来的数据,则当有新的数据从流中读出来时,这些数据就插入数据容器。这个数据容器用什么数据结构定义最合适呢?这个题的本质就是在探讨,用哪一种数据结构保存数据流时间复杂度最小。

Question

如何得到一个数据流中的中位数?如果从数据流中读出奇数个数值,那么中位数就是所有数值排序之后位于中间的数值。如果从数据流中读出偶数个数值,那么中位数就是所有数值排序之后中间两个数的平均值。

例如, [2,3,4] 的中位数是 3 [2,3] 的中位数是 (2 + 3) / 2 = 2.5 设计一个支持以下两种操作的数据结构:

  • void addNum(int num) - 从数据流中添加一个整数到数据结构中。
  • double findMedian() - 返回目前所有元素的中位数。

示例 1:

1
2
3
4
输入:
["MedianFinder","addNum","addNum","findMedian","addNum","findMedian"]
[[],[1],[2],[],[3],[]]
输出:[null,null,null,1.50000,null,2.00000]
示例 2:
1
2
3
4
输入:
["MedianFinder","addNum","findMedian","addNum","findMedian"]
[[],[2],[],[3],[]]
输出:[null,null,2.00000,null,2.50000]
限制:最多会对 addNumfindMedia进行 50000 次调用。

测试用例

功能测试(从数据流中读出奇数个数字;从数据流中读出偶数个数字)。 边界值测试(从数据流中读出0个、1个、2个数字)

本题考点

考查应聘者对时间复杂度的分析能力。 考查应聘者对数据结构的理解程度。应聘者只有对各个常用数据容器的特点非常了解,知道它们的优缺点及适用场景,才能找出最优的解法。

Intuition

由于数据是从一个数据流中读出来的,因而数据的数目随着时间的变化而增加。如果用一个数据容器来保存从流中读出来的数据,则当有新的数据从流中读出来时,这些数据就插入数据容器。这个数据容器用什么数据结构定义最合适呢?这个题的本质就是在探讨,用哪一种数据结构保存数据流时间复杂度最小。

几种容器比较

作为容器的数据结构 addNum的时间复杂度 findMeadia时间复杂度
没排序数组 \(O(1)\) \(O(n)\)
排序数组 \(O(n)\) \(O(1)\)
排序链表+指针 \(O(n)\) \(O(1)\)
二叉搜索树 平均\(O(logn)\),最差\(O(n)\) 平均\(O(logn)\),最差\(O(n)\)
AVL数 \(O(logn)\) \(O(1)\)
最大堆和最小堆 \(O(logn)\) \(O(1)\)

如何使用堆作为容器

4

我们注意到整个数据容器被分隔成两部分。位于容器左边部分的数据比右边的数据小。另外,\(P_1\) 指向的数据是左边部分最大的数,\(P_2\)指向的数据是左边部分最小的数。

如果能够保证数据容器左边的数据都小于右边的数据,那么即使左、右两边内部的数据没有排序,也可以根据左边最大的数及右边最小的数得到中位数。如何快速从一个数据容器中找出最大数?用最大堆实现这个数据容器,因为位于堆顶的就是最大的数据。同样,也可以快速从最小堆中找出最小数。

因此,用一个最大堆实现左边的数居容器,用一个最小堆实现右边的数据容器。往堆中插入一个数据的时间效率是 \(O(logn)\)。由于只需要 \(O(1)\) 时间就可以得到位于堆顶的数据,因此得到中位数的时间复杂度是 \(O(1)\)

接下来考虑用最大堆和最小堆实现的一些细节。首先要保证数据平均分配到两个堆中,因此两个堆中数据的数目之差不能超过1。为了实现平均分配,可以在数据的总数目是偶数时把新数据插入最小堆,否则插入最大堆。

还要保证最大堆中的所有数据都要小于最小堆中的数据。当数据的总数目是偶数时,按照前面的分配规则会把新的数据插入最小堆。如果此时这个新的数据比最大堆中的一些数据要小,那该怎么办呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if len(nums) & 1 == 0:  # 偶数,把新数据插入最小堆。
if num > max_heap_root:
min_heap.addNum(num)
else:
max_heap.addNum(num) # 先把这个新的数据插入最大堆
max_heap_root = max_heap.removeRoot() # 最大堆中最大的数字拿出来插入最小堆
min_heap.addNum(max_heap_root)
else: # 奇数,把新数据插入最大堆。
if num < min_heap_root:
max_heap.addNum(num)
else:
min_heap.addNum(num)
min_heap_root = min_heap.removeRoot()
min_heap.addNum(min_heap_root)

可以先把这个新的数据插入最大堆,接着把最大堆中最大的数字拿出来插入最小堆。由于最终插入最小堆的数字是原最大堆中最大的数字,这样就保证了最小堆中所有数字都大于最大堆中的数字。

Code

  • 自行构造了基于 list 的大根堆和小根堆来存储字节流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class MedianFinder:

def __init__(self):
"""
initialize your data structure here.
"""
self.left = []
self.right = []
self.count = 0


def addNum(self, num: int) -> None:
if self.count & 1 == 0: # even 偶数,把新数据插入大根堆,self.left。
self.left.append(num)
else: # odd 奇数,把新数据插入小根堆,self.right。
self.right.append(num)
self.count += 1

def addNum1(self, num: int) -> None:
# 本实现和题解相反,当然这样也可以。
# 偶数插入大根堆,奇数插入小根堆。
if self.count & 1 == 0: # even 偶数,把新数据插入大根堆,self.left。
if len(self.left) == 0:
self.left.append(num)
elif num < self.right[0]:
self.left.append(num)
# self.maxHeap(self.left)
else:
# 先把这个新的数据插入最小堆
self.right.append(num)
self.minHeap(self.right) # 调整小根堆
# 获取小根堆的根节点
min_heap_root = self.right[0]
del self.right[0]
# self.minHeap(self.right) # 调整小根堆
# 插入大根堆
self.left.append(min_heap_root)
# self.maxHeap(self.left) # 调整大根堆
else: # odd 奇数,把新数据插入小根堆,self.right。
if len(self.right) == 1:
self.right.append(num)
elif num > self.left[0]:
self.right.append(num)
# self.minHeap(self.right) # 调整小根堆
else:
# 先把这个新的数据插入最大堆
self.left.append(num)
self.maxHeap(self.left) # 调整大根堆
# 获取大根堆的根节点
max_heap_root = self.left[0]
del self.left[0]
# self.maxHeap(self.left) # 调整大根堆
# 插入小根堆
self.right.append(max_heap_root)
# self.minHeap(self.right) # 调整大根堆
self.count += 1


def findMedian(self) -> float:
if self == 1:
return self.left[0]
self.maxHeap(self.left) # 调整大根堆
self.minHeap(self.right) # 调整小根堆
# 调整大根和小根
if len(self.left) and len(self.right) and (self.left[0] > self.right[0]):
self.left[0], self.right[0] = self.right[0], self.left[0]
self.maxHeap(self.left) # 在调整了大根和小根之后,再次调整大根堆
self.minHeap(self.right) # 在调整了大根和小根之后,再次调整小根堆
if self.count & 1 == 0: # even
return (self.left[0] + self.right[0]) / 2.0
else: # odd
return self.left[0]

# 构造大根堆
def maxHeap(self, _list):
length = len(_list)
if _list == None or length < 1:
return
if length == 1:
return _list
for i in range(length//2-1, -1, -1):
k = i
tmp, heap = _list[k], False
while not heap and 2*k < length-1:
index = 2*k + 1
if index < length - 1:
if _list[index] < _list[index + 1]:
index += 1
if tmp >= _list[index]:
heap = True
else:
_list[k] = _list[index]
k = index
_list[k] = tmp

# 构造小根堆
def minHeap(self, _list):
length = len(_list)
if _list == None or length < 1:
return
if length == 1:
return _list
for i in range(length//2-1, -1, -1):
k = i
tmp, heap = _list[k], False
while not heap and 2*k < length-1:
index = 2*k + 1
if index < length - 1:
if _list[index] > _list[index + 1]:
index += 1
if tmp <= _list[index]:
heap = True
else:
_list[k] = _list[index]
k = index
_list[k] = tmp

自建heap会超出LeetCode的时间限制

  • 调用系统的 heap 来存储字节流。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from heapq import *

class MedianFinder:
def __init__(self):
self.A = [] # 小顶堆,保存较大的一半
self.B = [] # 大顶堆,保存较小的一半

def addNum(self, num: int) -> None:
if len(self.A) != len(self.B):
heappush(self.A, num)
heappush(self.B, -heappop(self.A))
else:
heappush(self.B, -num)
heappush(self.A, -heappop(self.B))

def findMedian(self) -> float:
return self.A[0] if len(self.A) != len(self.B) else (self.A[0] - self.B[0]) / 2.0

调用系统的heap就不会超出时间限制。

Push item on the heap, then pop and return the smallest item from the heap. The combined action runs more efficiently than heappush() followed by a separate call to heappop().

根据以上文档说明,可将 Python 代码优化为,效率最高!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from heapq import *

class MedianFinder:
def __init__(self):
self.A = [] # 小顶堆,保存较大的一半
self.B = [] # 大顶堆,保存较小的一半

def addNum(self, num: int) -> None:
if len(self.A) != len(self.B):
heappush(self.B, -heappushpop(self.A, num))
else:
heappush(self.A, -heappushpop(self.B, -num))

def findMedian(self) -> float:
return self.A[0] if len(self.A) != len(self.B) else (self.A[0] - self.B[0]) / 2.0
作者:jyd 链接:https://leetcode-cn.com/problems/shu-ju-liu-zhong-de-zhong-wei-shu-lcof/solution/mian-shi-ti-41-shu-ju-liu-zhong-de-zhong-wei-shu-y/ 来源:力扣(LeetCode) 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。