Skip to content

T27256: 当前队列中位数

data structures, OOP, http://cs101.openjudge.cn/practice/27256/

中位数是有序序列最中间的那个数。如果序列的长度是偶数,则没有最中间的数;此时中位数是最中间的两个数的平均数。

现在,假设我有一个空的list,我会对他进行以下三种操作:

  1. 在list最后添加一个数:add x
  2. 删除当前list的第一个数:del
  3. 查询当前list的中位数:query

输入

输入为若干行,第一行为一个整数n(n <= 100000),表示操作的次数,接下来n行表示n次操作。数据保证在删除或查询的时候,队列长度大于0

输出

针对于每次query,输出当前的中位数

样例输入

sample1 in:
5
add 1
add 2
query
del
query

sample1 out:
1.5
2

样例输出

sample2 input:
5
add 1
query
add 3
query
query

sample2 out:
1
2
2

提示

tags: sorting, data structure。注意输出格式如样例所示。

来源

2023fall zzr

给出一种基于双堆(大根堆和小根堆)及延迟删除的做法,既能在队列操作(FIFO)下维护数据,又能高效查询中位数。

python
import sys
import heapq
from collections import deque, defaultdict

input = sys.stdin.readline

class DualHeap:
    def __init__(self):
        # 大根堆(存放较小的一半,用负数模拟)
        self.small = []  
        # 小根堆(存放较大的一半)
        self.large = []  
        # 延迟删除的记录
        self.delayed = defaultdict(int)
        # 两个堆中有效数据的个数
        self.small_size = 0  
        self.large_size = 0  

    def prune(self, heap):
        # 清理堆顶已经被延迟删除的元素,不再调整size(因为在remove中已扣减)
        if heap is self.small:
            while heap and self.delayed[-heap[0]] > 0:
                num = -heapq.heappop(heap)
                self.delayed[num] -= 1
        else:
            while heap and self.delayed[heap[0]] > 0:
                num = heapq.heappop(heap)
                self.delayed[num] -= 1

    def balance(self):
        # 保持 small 比 large 多 0 或 1 个有效元素
        if self.small_size > self.large_size + 1:
            self.prune(self.small)
            num = -heapq.heappop(self.small)
            self.small_size -= 1
            heapq.heappush(self.large, num)
            self.large_size += 1
        elif self.small_size < self.large_size:
            self.prune(self.large)
            num = heapq.heappop(self.large)
            self.large_size -= 1
            heapq.heappush(self.small, -num)
            self.small_size += 1

    def add(self, num):
        # 插入新数时,根据大小决定进入哪个堆
        if not self.small or num <= -self.small[0]:
            heapq.heappush(self.small, -num)
            self.small_size += 1
        else:
            heapq.heappush(self.large, num)
            self.large_size += 1
        self.balance()

    def remove(self, num):
        # 延迟删除:标记待删除,同时减少对应堆的有效元素数量
        self.delayed[num] += 1
        if self.small and num <= -self.small[0]:
            self.small_size -= 1
            if num == -self.small[0]:
                self.prune(self.small)
        else:
            self.large_size -= 1
            if self.large and num == self.large[0]:
                self.prune(self.large)
        self.balance()

    def median(self):
        # 查询中位数前先清理堆顶
        self.prune(self.small)
        self.prune(self.large)
        total = self.small_size + self.large_size
        if total % 2 == 1:
            return -self.small[0]
        else:
            return (-self.small[0] + self.large[0]) / 2

if __name__ == '__main__':
    n = int(input())
    dh = DualHeap()
    # 使用 deque 记录入队顺序,确保del操作删除最先添加的数
    q = deque()
    results = []
    for _ in range(n):
        parts = input().split()
        op = parts[0]
        if op == 'add':
            x = int(parts[1])
            dh.add(x)
            q.append(x)
        elif op == 'del':
            x = q.popleft()
            dh.remove(x)
        elif op == 'query':
            med = dh.median()
            # 若中位数为整数则去除小数部分
            if med == int(med):
                results.append(str(int(med)))
            else:
                results.append(str(med))
    print("\n".join(results))

代码说明

  1. DualHeap 类:
    • 使用两个堆:small(大根堆,用负数存储)和 large(小根堆)。
    • 通过延迟删除字典 delayed 记录待删除的元素,避免在堆中直接查找删除,节省时间。
    • small_sizelarge_size 分别记录堆中实际有效的元素数量。
  2. 操作函数:
    • add(num): 根据数值大小将 num 插入到对应的堆中,然后调用 balance 进行平衡。
    • remove(num):num标记为延迟删除,同时减少相应堆的计数,再进行堆顶清理和平衡。
    • median(): 先清理堆顶,再根据当前元素个数奇偶返回中位数(若为偶数则返回两个堆顶的平均值)。
  3. 主函数:
    • 通过 deque 保存添加的顺序,保证“del”操作始终删除最先添加的数。
    • 每次执行 query 时调用 median() 并按照要求输出。
python
import collections
import heapq


class DualHeap:
    def __init__(self):
        # 大根堆,维护较小的一半元素,注意 python 没有大根堆,需要将所有元素取相反数并使用小根堆
        self.small = list()
        # 小根堆,维护较大的一半元素
        self.large = list()
        # 哈希表,记录「延迟删除」的元素,key 为元素,value 为需要删除的次数
        self.delayed = collections.Counter()

        # small 和 large 当前包含的元素个数,需要扣除被「延迟删除」的元素
        self.smallSize = 0
        self.largeSize = 0

    # 不断地弹出 heap 的堆顶元素,并且更新哈希表
    def prune(self, heap):
        while heap:
            num = heap[0]
            if heap is self.small:
                num = -num
            if num in self.delayed:
                self.delayed[num] -= 1
                if self.delayed[num] == 0:
                    self.delayed.pop(num)
                heapq.heappop(heap)
            else:
                break

    # 调整 small 和 large 中的元素个数,使得二者的元素个数满足要求
    def make_balance(self):
        if self.smallSize > self.largeSize + 1:
            # small 比 large 元素多 2 个
            heapq.heappush(self.large, -self.small[0])
            heapq.heappop(self.small)
            self.smallSize -= 1
            self.largeSize += 1
            # small 堆顶元素被移除,需要进行 prune
            self.prune(self.small)
        elif self.smallSize < self.largeSize:
            # large 比 small 元素多 1 个
            heapq.heappush(self.small, -self.large[0])
            heapq.heappop(self.large)
            self.smallSize += 1
            self.largeSize -= 1
            # large 堆顶元素被移除,需要进行 prune
            self.prune(self.large)

    def insert(self, num):
        if not self.small or num <= -self.small[0]:
            heapq.heappush(self.small, -num)
            self.smallSize += 1
        else:
            heapq.heappush(self.large, num)
            self.largeSize += 1
        self.make_balance()

    def erase(self, num):
        self.delayed[num] += 1
        if num <= -self.small[0]:
            self.smallSize -= 1
            if num == -self.small[0]:
                self.prune(self.small)
        else:
            self.largeSize -= 1
            if num == self.large[0]:
                self.prune(self.large)
        self.make_balance()

    def get_median(self):
        return -self.small[0] if self.smallSize != self.largeSize else (-self.small[0] + self.large[0]) / 2


n = int(input())
q = DualHeap()
l = []
start_idx = 0
for _ in range(n):
    operation = input()
    if operation == 'query':
        ans = q.get_median()
        if round(ans) == ans:
            print(int(ans))
        else:
            print(ans)
    elif operation == 'del':
        q.erase(l[start_idx])
        start_idx += 1
    else:
        t = int(operation.split()[1])
        q.insert(t)
        l.append(t)

修改了时间限制,现在二分的做法应该是超时。因为这个方法最坏时间复杂度是O(n^2)。

python
# 2300011742 张展皓 化院
from bisect import bisect_left
a, b, cnt, now = [], [], 0, 0
for _ in range(int(input())):
    opt = input().split()
    if opt[0] == 'query':
        l = len(a)
        if l & 1: print(a[l >> 1][0])
        else:
            ans = (a[l >> 1][0] + a[l - 1 >> 1][0]) / 2 
            print(ans if int(ans) != ans else int(ans))
    if opt[0] == 'add':
        v = int(opt[1])
        a.insert(bisect_left(a, [v, 0]), [v, cnt])
        b.append(v)
        cnt += 1
    if opt[0] == 'del': 
        v = b[now]
        now += 1
        a.pop(bisect_left(a, [v, 0]))

沈俊丞25。AC后,用AI修了一下。

python
from heapq import heappop, heappush
from collections import deque
import sys
input = sys.stdin.readline

def lazy_delete(heap, del_num):
    # 清理堆顶那些逻辑上已被删除(index < del_num)的元素
    while heap and heap[0][1] < del_num:
        heappop(heap)

n = int(input().strip())
arr = deque()        # 存 (value, idx)
heap_low = []        # 小根堆:存“较大的一半”(直接存 value, idx)
heap_high = []       # 用负值表示的大根堆:存“较小的一半”(存 -value, idx)

del_num = 0          # 已删除计数阈值(所有 index < del_num 的元素视为已删除)
addi = 0             # 插入编号,用作唯一索引

# 记录每个 idx 当前属于哪一侧('low' 或 'high')
pos = {}

# 有效计数(不含堆中尚未物理弹出的过期项)
size_low = 0
size_high = 0

for _ in range(n):
    order = input().strip()
    if not order:
        continue

    # 每次操作前先清理堆顶过期项(保证堆顶可用)
    lazy_delete(heap_low, del_num)
    lazy_delete(heap_high, del_num)

    if order[0] == 'a':  # add x
        x = int(order.split()[1])
        arr.append((x, addi))

        # 决定放哪一堆:保持 heap_low 存较大一半(min-heap), heap_high 存较小一半(max via -v)
        if not heap_low or x >= heap_low[0][0]:
            heappush(heap_low, [x, addi])
            pos[addi] = 'low'
            size_low += 1
        else:
            heappush(heap_high, [-x, addi])
            pos[addi] = 'high'
            size_high += 1

        # 根据**有效计数**平衡两堆:允许 heap_low 比 heap_high 多 1
        if size_low > size_high + 1:
            # 从 low 移动到 high
            lazy_delete(heap_low, del_num)
            v, i = heappop(heap_low)
            heappush(heap_high, [-v, i])
            pos[i] = 'high'
            size_low -= 1
            size_high += 1
        elif size_high > size_low:
            # 从 high 移动到 low
            lazy_delete(heap_high, del_num)
            vneg, i = heappop(heap_high)
            v = -vneg
            heappush(heap_low, [v, i])
            pos[i] = 'low'
            size_high -= 1
            size_low += 1

        addi += 1

    elif order[0] == 'd':  # del
        # 题目保证删除时队列非空
        val, idx_del = arr.popleft()
        # 标记逻辑删除的阈值
        del_num += 1

        # 减少对应堆的有效计数(我们在插入时用 pos 记录了它属哪堆)
        side = pos.get(idx_del)
        if side == 'low':
            size_low -= 1
        elif side == 'high':
            size_high -= 1
        # 删除 pos 记录(此元素已逻辑删除)
        if idx_del in pos:
            del pos[idx_del]

        # 根据有效计数重新平衡(同上)
        if size_low > size_high + 1:
            lazy_delete(heap_low, del_num)
            v, i = heappop(heap_low)
            heappush(heap_high, [-v, i])
            pos[i] = 'high'
            size_low -= 1
            size_high += 1
        elif size_high > size_low:
            lazy_delete(heap_high, del_num)
            vneg, i = heappop(heap_high)
            v = -vneg
            heappush(heap_low, [v, i])
            pos[i] = 'low'
            size_high -= 1
            size_low += 1

    else:  # query
        # 如果 low 比 high 多一个,则中位数是 low 的堆顶(最小的右侧元素)
        if size_low > size_high:
            # low 存原值
            print(heap_low[0][0])
        else:
            a = heap_low[0][0]
            b = -heap_high[0][0]
            s = a + b
            if s % 2 == 0:
                print(s // 2)
            else:
                # 输出 1.5 之类的一位小数(避免 2.0)
                print(f"{s / 2:.1f}")

【马健文 25 元陪】

1. 问题概述

给定一个动态变化的队列,支持三种操作:

  1. add x:在队列末尾添加元素 x
  2. del:删除队列开头的元素
  3. query:查询当前队列的中位数

需要实现一个高效的数据结构,能在最多 100,000 次操作中快速响应查询请求。

2. 核心思路:双堆 + 延迟删除

数据结构:

使用双端队列(deque)来维护主体的队列

使用两个堆来维护队列元素:

  • 最大堆(maxheap):存储队列中较小的一半元素,堆顶是这部分的最大值
  • 最小堆(minheap):存储队列中较大的一半元素,堆顶是这部分的最小值

这样设计可以保证:

  • 当队列大小为奇数时,中位数就是最大堆的堆顶元素
  • 当队列大小为偶数时,中位数是最大堆堆顶和最小堆堆顶的平均值
python
dequeue = deque([])        # 实际的队列,用于按顺序存储元素
maxheap = []               # 最大堆(存储较小一半元素)
minheap = []               # 最小堆(存储较大一半元素)
maxheap_size = 0           # 最大堆中有效元素数量
minheap_size = 0           # 最小堆中有效元素数量
head = 0                   # 队列头部指针
tail = 0                   # 队列尾部指针

延迟删除

传统的双堆算法支持高效插入和查询,但删除任意元素(特别是队列头部)比较困难。我们需要一种机制来处理队首元素的删除。

实现方法
  • 不直接从堆中删除元素,而是通过标记为"无效"
  • 查询时跳过无效元素,保持堆结构的完整性
  • 避免频繁的堆重建操作,提高效率

此处的具体解决方案:给每个元素标记一个"位置索引",通过维护 headtail 指针来判断元素是否仍在队列中。

3. 算法详细设计

1. 元素插入(add 操作)

python
def add(x):
    # 1. 将元素加入实际队列
    dequeue.append(x)
    
    # 2. 确定元素应该加入哪个堆
    C = leftmax()  # 获取最大堆的最大值(较小一半的最大值)
    if C == None or x <= C:
        # 如果元素小于等于最大堆的最大值,加入最大堆
        heappush(maxheap, (-x, tail))  # 存储负值以实现最大堆
        maxheap_size += 1
    else:
        # 否则加入最小堆
        heappush(minheap, (x, tail))
        minheap_size += 1
    
    # 3. 更新尾部指针
    tail += 1
    
    # 4. 重新平衡两个堆的大小
    rebalance()

插入策略:新元素根据与最大堆堆顶的比较结果,决定加入哪个堆,保证最大堆的所有元素 ≤ 最小堆的所有元素。

2. 元素删除(del 操作)

这是算法中最复杂的部分,需要处理多种情况:

python
def delete():
    # 1. 从实际队列中弹出队首元素
    x = dequeue.popleft()
    
    # 2. 获取当前两个堆的有效极值
    L = leftmax()  # 最大堆的最大值
    R = rightmin() # 最小堆的最小值
    
    # 3. 判断被删除元素属于哪个堆
    if maxheap and minheap:
        if x < R:
            # 如果x小于最小堆的最小值,一定在最大堆
            maxheap_size -= 1
        elif x > L:
            # 如果x大于最大堆的最大值,一定在最小堆
            minheap_size -= 1 
        else:
            # 如果x等于某个极值,需要根据位置判断
            if head == maxheap[0][1]:
                maxheap_size -= 1
            elif head == minheap[0][1]:
                minheap_size -= 1
    elif maxheap:
        # 只有最大堆有元素
        maxheap_size -= 1
    elif minheap:
        # 只有最小堆有元素
        minheap_size -= 1
    
    # 4. 更新头部指针
    head += 1
    
    # 5. 重新平衡
    rebalance()

删除判断逻辑

  • 比较被删除元素与两个堆的极值
  • 小于最小堆最小值 → 在最大堆
  • 大于最大堆最大值 → 在最小堆
  • 等于极值 → 检查堆顶元素位置是否匹配被删除位置

3. 查询中位数(query 操作)

python
def query():
    # 1. 获取当前两个堆的有效极值
    left = leftmax()
    right = rightmin()
    
    # 2. 根据堆的大小计算中位数
    if not maxheap_size and not minheap_size:
        return None
    if maxheap_size == minheap_size:
        # 偶数个元素:取平均值
        return (left + right) / 2
    # 奇数个元素:取最大堆的堆顶(因为最大堆多一个元素)
    return left

4. 辅助函数

返回堆顶的同时清理无效元素

python
def leftmax():
    # 清理最大堆顶部的无效元素
    while maxheap and not inqueue(maxheap[0][1]):
        heappop(maxheap)
    if maxheap:
        return -maxheap[0][0]  # 返回正值
    return None

def rightmin():
    # 清理最小堆顶部的无效元素
    while minheap and not inqueue(minheap[0][1]):
        heappop(minheap)
    if minheap:
        return minheap[0][0]
    return None

重新平衡双堆 rebalance

python
def rebalance():
    # 1. 先清理两个堆顶的无效元素
    leftmax()
    rightmin()
    
    # 2. 调整大小差不超过1,保证最大堆的大小 >= 最小堆的大小
    while maxheap_size > minheap_size + 1:
        # 从最大堆移动元素到最小堆
        value, pos = heappop(maxheap)
        value = -value
        heappush(minheap, (value, pos))
        maxheap_size -= 1
        minheap_size += 1
        
    while maxheap_size < minheap_size:
        # 从最小堆移动元素到最大堆
        value, pos = heappop(minheap)
        value = -value
        heappush(maxheap, (value, pos))
        maxheap_size += 1
        minheap_size -= 1

4. 最终代码:

python
from collections import deque
from heapq import *
dequeue = deque([])
maxheap = []
minheap = []
maxheap_size = minheap_size = 0
head = 0
tail = 0

def inqueue(pos):
    return head <= pos < tail

def leftmax():
    while maxheap and not inqueue(maxheap[0][1]):
        heappop(maxheap)
    if maxheap:
        return -maxheap[0][0]
    return None

def rightmin():
    while minheap and not inqueue(minheap[0][1]):
        heappop(minheap)
    if minheap:
        return minheap[0][0]
    return None

def rebalance():
    global head,tail,maxheap_size,minheap_size
    
    leftmax()
    rightmin()
    while maxheap_size > minheap_size + 1:
        leftmax()
        value,pos = heappop(maxheap)
        value = -value
        heappush(minheap,(value,pos))
        maxheap_size -= 1
        minheap_size += 1
    while maxheap_size < minheap_size:
        rightmin()
        value,pos = heappop(minheap)
        value = -value
        heappush(maxheap,(value,pos))
        maxheap_size += 1
        minheap_size -= 1
        
def add(x):
    global head,tail,maxheap_size,minheap_size
    dequeue.append(x)
    
    C = leftmax()
    if C == None or x <= C:
        heappush(maxheap,(-x,tail))
        maxheap_size += 1
    else:
        heappush(minheap,(x,tail))
        minheap_size += 1
    
    tail += 1

    rebalance()

def delete():
    global head,tail,maxheap_size,minheap_size
    x = dequeue.popleft()
    L = leftmax()
    R = rightmin()

    if maxheap and minheap:
        if x < R:
            maxheap_size -= 1
        elif x > L:
            minheap_size -= 1 
        else:
            if head == maxheap[0][1]:
                maxheap_size -= 1
            elif head == minheap[0][1]:
                minheap_size -= 1
            else:
                print('Error: popped element mismatched!')
    elif maxheap:
        maxheap_size -= 1
    elif minheap:
        minheap_size -= 1
    else:
        print('Error: deleting from empty heaps')
    
    head += 1
    
    rebalance()

def query():
    left = leftmax()
    right = rightmin()

    if not maxheap_size and not minheap_size:
        return None
    if maxheap_size == minheap_size:
        return (left+right)/2
    
    return left

n = int(input())
results = []
for _ in range(n):
    a = input().split()
    if a[0] == 'add':
        add(int(a[1]))
    elif a[0] == 'del':
        delete()
    else:
        result = query()
        if isinstance(result, float) and result.is_integer():
            results.append(str(int(result)))
        else:
            results.append(str(result))
print('\n'.join(results))

5. 注意事项

需要注意的点很多!

(1) dequeue的head和tail

由于一开始的时候初始化head = tail = 0

且每个元素在插入时获得一个唯一的索引(tail值)

所以inqueue函数判断的时候应该是:

head <= pos < tail

这样正好和左闭右开原则匹配

(2) 先进行判断左右再进行size变化

由于通过head和tail判断inqueue,所以应该先进行判断左右再进行size变化,不然的话你都取不出来这个元素(直接被清理了)

(3) 快读快写防止TLE

(4) 保留小数的问题:

需要判断是否是float,如果是float能不能变成整数(isinstance

python
result = query()
    if isinstance(result, float) and result.is_integer():
        results.append(str(int(result)))
    else:
        results.append(str(result))

(5) delete的时候有重复元素的问题

边界处的重复元素需要判断在两侧究竟哪一边存在。由于heapq的最小堆性质,相同大小的元素中,一定是索引最小的元素出现在堆顶的位置,自然也就满足了从tail进行删除的要求

python
	# 3. 判断被删除元素属于哪个堆
    if maxheap and minheap:
        if x < R:
            # 如果x小于最小堆的最小值,一定在最大堆
            maxheap_size -= 1
        elif x > L:
            # 如果x大于最大堆的最大值,一定在最小堆
            minheap_size -= 1 
        else:
            # 此时x == L == R,需要根据位置判断
            if head == maxheap[0][1]:
                maxheap_size -= 1
            elif head == minheap[0][1]:
                minheap_size -= 1
    elif maxheap:
        # 只有最大堆有元素
        maxheap_size -= 1
    elif minheap:
        # 只有最小堆有元素
        minheap_size -= 1

(6) rebalance的时候得把堆顶清理了,不要对无效元素进行转移

6.复杂度分析

时间复杂度:

操作时间复杂度说明
addO(log n)堆插入操作
delO(log n)堆调整操作
queryO(1)直接访问堆顶

其中 n 是当前队列中的元素数量。每个操作都涉及到堆的调整,但通过延迟删除策略,避免了最坏情况下的 O(n) 删除操作。

空间复杂度

O(n),需要存储所有队列元素及其位置信息。