## Problem

Our algorithm must monitor a sequence of events, then at any given time output (an estimate of) the number of events thus far. More formally, this is a data structure maintaining a single integer $n$ and supporting the following two operations:

• update(): increment $n$ by 1
• query(): output (an estimate of) $n$

Before any operations are performed, it is assumed that $n$ starts at $0$. Of course a trivial algorithm maintains $n$ using $O(\log n)$ bits of memory (a counter). Our goal is to use much less space than this. It is not too hard to prove that it is impossible to solve this problem exactly using $o(\log n)$ bits of space. Thus we would like to answer query() with some estimate $\hat{n}$ of $n$ satisfying

$$\mathbb{P}(|\hat{n} − n| > \epsilon n) < \delta$$

for some $0 < \epsilon$, $\delta < 1$ that are given to the algorithm up front.

## Morris Algorithm

The algorithm works as follows:

1. Initialize $X \leftarrow 0$.
2. For each update, increment $X$ with probability $\frac{1}{2^X}$.
3. For a query, output $\hat{n} = 2^X − 1$.

Intuitively, the variable $X$ is attempting to store a value that is $\approx \log_2 n$.

### Analysis

Define $X_n$ = value of $X$ after $n$ events. Then $\mathbb{E}[2^{X_n} − 1] = n$.

#### Proof

The proof is by induction on $n$. The basis of the induction: for $n = 0, X = 0$ and therefore $X_0 = 1 = n + 1$. So assume for inductive hypothesis that $\mathbb{E}[2^{X_n}] = n + 1$. We now argue the inductive step. We have

\begin{align*} \mathbb{E}[2^{X_{n+1}}] &= \sum_{j=0}^\infty \mathbb{P}[X_n=j] \cdot \mathbb{E}[2^{X_{n+1}}|X_n=j]\ &= \sum_{j=0}^\infty \mathbb{P}[X_n=j] \cdot \left( 2^j(1-\frac{1}{2^j}) + \frac{1}{2^j} \cdot 2^{j+1} \right) \ &= \sum_{j=0}^\infty \mathbb{P}[X_n=j]2^j + \sum_{j=0}^\infty \mathbb{P}[X_n=j]\ &= \mathbb{E}[2^{X_{n}}] + 1\ & = (n+1) + 1 \end{align*}

It is now clear from the above that $\hat{n} = 2^X − 1$ is an unbiased estimator of $n$.

### Python implementation

from random import random

class Morris:
def __init__(self):
self.x = 0

def update(self):
if random() < 1/2**self.x:
self.x += 1

def query(self):
return 2**self.x -1

morris = Morris()
n = 10000000
for _ in range(n):
morris.update()
n_hat = morris.query()
print("Actual n:%d, Estimated n:%d" % (n, n_hat))


## Morris+ Algorithm

To decrease the failure probability of Morris’ basic algorithm, we instantiate $s$ independent copies of Morris’ algorithm and average their outputs. That is, we obtain independent estimators $\hat{n}_1, \dots , \hat{n}_s$ from independent instantiations of Morris’ algorithm, and our output to a query is

$$\hat{n} = \sum_{i=1}^s \hat{n}_i$$

### Python implementation

class Morris_plus:
def __init__(self, s):
self.s = s
self.estimators = [Morris() for _ in range(s)]

def update(self):
for est in self.estimators:
est.update()

def query(self):
return sum([est.query() for est in self.estimators])/self.s

morris_p = Morris_plus(10)
n = 10000000
for _ in range(n):
morris_p.update()
n_hat = morris_p.query()
print("Actual n:%d, Estimated n:%d" % (n, n_hat))


## Morris++ Algorithm

It turns out there is a simple technique (which we will see often) to reduce the dependence on the failure probability $δ$ from $1/δ$ to $log(1/δ)$. The technique is as follows. We run $t$ instantiations of Morris+ algorithm and then output the median estimate from all the instantiations.

### Python implementation

from statistics import median

class Morris_plus_plus:
def __init__(self, s, t):
self.t = t
self.estimators = [Morris_plus(s) for _ in range(t)]

def update(self):
for est in self.estimators:
est.update()

def query(self):
return median([est.query() for est in self.estimators])

morris_pp = Morris_plus_plus(10,10)
n = 100000
for _ in range(n):
morris_pp.update()
n_hat = morris_pp.query()
print("Actual n:%d, Estimated n:%d" % (n, n_hat))


## Reference

Sketchingbigdata.org Lecture Notes