Approximate Counting
Probabilistic techniques for counting
By Vamshi Jandhyala in mathematics
December 1, 2020
Problem
Our algorithm must monitor a sequence of events, then at any given time output (an estimate of) the number of events thus far. More formally, this is a data structure maintaining a single integer $n$ and supporting the following two operations:
- update(): increment $n$ by 1
- query(): output (an estimate of) $n$
Before any operations are performed, it is assumed that $n$ starts at $0$. Of course a trivial algorithm maintains $n$ using $O(\log n)$ bits of memory (a counter). Our goal is to use much less space than this. It is not too hard to prove that it is impossible to solve this problem exactly using $o(\log n)$ bits of space. Thus we would like to answer query() with some estimate $\hat{n}$ of $n$ satisfying
$$ \mathbb{P}(|\hat{n} − n| > \epsilon n) < \delta $$
for some $0 < \epsilon$, $\delta < 1$ that are given to the algorithm up front.
Morris Algorithm
The algorithm works as follows:
- Initialize $X \leftarrow 0$.
- For each update, increment $X$ with probability $\frac{1}{2^X}$.
- For a query, output $\hat{n} = 2^X − 1$.
Intuitively, the variable $X$ is attempting to store a value that is $\approx \log_2 n$.
Analysis
Define $X_n$ = value of $X$ after $n$ events. Then $\mathbb{E}[2^{X_n} − 1] = n$.
Proof
The proof is by induction on $n$. The basis of the induction: for $n = 0, X = 0$ and therefore $X_0 = 1 = n + 1$. So assume for inductive hypothesis that $\mathbb{E}[2^{X_n}] = n + 1$. We now argue the inductive step. We have
$$
\begin{align*}
\mathbb{E}[2^{X_{n+1}}] &= \sum_{j=0}^\infty \mathbb{P}[X_n=j] \cdot \mathbb{E}[2^{X_{n+1}}|X_n=j]\
&= \sum_{j=0}^\infty \mathbb{P}[X_n=j] \cdot \left( 2^j(1-\frac{1}{2^j}) + \frac{1}{2^j} \cdot 2^{j+1} \right) \
&= \sum_{j=0}^\infty \mathbb{P}[X_n=j]2^j + \sum_{j=0}^\infty \mathbb{P}[X_n=j]\
&= \mathbb{E}[2^{X_{n}}] + 1\
& = (n+1) + 1
\end{align*}
$$
It is now clear from the above that $\hat{n} = 2^X − 1$ is an unbiased estimator of $n$.
Python implementation
from random import random
class Morris:
def __init__(self):
self.x = 0
def update(self):
if random() < 1/2**self.x:
self.x += 1
def query(self):
return 2**self.x -1
morris = Morris()
n = 10000000
for _ in range(n):
morris.update()
n_hat = morris.query()
print("Actual n:%d, Estimated n:%d" % (n, n_hat))
Morris+ Algorithm
To decrease the failure probability of Morris’ basic algorithm, we instantiate $s$ independent copies of Morris’ algorithm and average their outputs. That is, we obtain independent estimators $\hat{n}_1, \dots , \hat{n}_s$ from independent instantiations of Morris’ algorithm, and our output to a query is
$$ \hat{n} = \sum_{i=1}^s \hat{n}_i $$
Python implementation
class Morris_plus:
def __init__(self, s):
self.s = s
self.estimators = [Morris() for _ in range(s)]
def update(self):
for est in self.estimators:
est.update()
def query(self):
return sum([est.query() for est in self.estimators])/self.s
morris_p = Morris_plus(10)
n = 10000000
for _ in range(n):
morris_p.update()
n_hat = morris_p.query()
print("Actual n:%d, Estimated n:%d" % (n, n_hat))
Morris++ Algorithm
It turns out there is a simple technique (which we will see often) to reduce the dependence on the failure probability $δ$ from $1/δ$ to $log(1/δ)$. The technique is as follows. We run $t$ instantiations of Morris+ algorithm and then output the median estimate from all the instantiations.
Python implementation
from statistics import median
class Morris_plus_plus:
def __init__(self, s, t):
self.t = t
self.estimators = [Morris_plus(s) for _ in range(t)]
def update(self):
for est in self.estimators:
est.update()
def query(self):
return median([est.query() for est in self.estimators])
morris_pp = Morris_plus_plus(10,10)
n = 100000
for _ in range(n):
morris_pp.update()
n_hat = morris_pp.query()
print("Actual n:%d, Estimated n:%d" % (n, n_hat))