πŸ’Έ

EIP 1559 Market on Polymarket

This is an analysis of the probability of the London hard fork happening after August 4th at 23:59 ET

Here's the output of the simulation as of July 14. In 99.7% of the simulations, the fork happens after midnight. The most common hour when it happens is between 3am and 4am, this happens 28.4% of the time. The 25th and 75th percentiles are roughtly between 2.87 and 4.68 hours past midnight.

percent success 0.99753 avg margin hours 3.773372597222222 25th percentile: 2.8722222222222222 75th percentile 4.680833333333333 -3 1 -2 20 -1 226 0 1729 1 7410 2 18782 3 28439 4 25417 5 13106 6 4050 7 726 8 87 9 7

The way it works is first we download all the blocks from the past month. It assumes that the block time distribution will remain the same in the future as in the past. We then use all the block times to do a monte Carlo simulation of when the actual fork will happen.

import datetime
import json
import math
import random
import requests
import time

TARGET_BLOCK = 12965000
TARGET_TIME = 1628135999 #August 5th at midnight ET
GETH_NODE = None #Change this

def make_req(params):
    params = json.dumps(params)
    headers = {'Content-Type': 'application/json'}
    while True:
        try:
            r = requests.post(GETH_NODE, headers=headers, data=params)
            return json.loads(r.text)
        except Exception as e:
            print("EXCEPTION!!!")
            print(e)
            time.sleep(10)
            print("done sleeping")

def latest_block():
    params = {"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":83}
    res = make_req(params)
    return int(res['result'], 16)

def get_block_time(block_num):
    params = {"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":[hex(block_num), True],"id":1}
    res = make_req(params)
    return int(res['result']['timestamp'], 16)

def print_time(epoch, label):
    if False:
        return
    e = datetime.datetime.utcfromtimestamp(epoch)
    print(label, e)

def load_cache():
    with open("block_data.json", 'r') as f:
        data = json.load(f)

    block_data = {}
    for k in data:
        block_data[int(k)] = data[k]
    return block_data

def download_blocks():
    print("starting download")
    block_data = load_cache()

    latest_block_num = latest_block()
    blocks_to_go = TARGET_BLOCK - latest_block_num
    past_block_num = latest_block_num - blocks_to_go

    i = 0
    for cur_block in range(past_block_num, latest_block_num):
        if i % 100 == 0:
            print(i)
        if i % 10000 == 0:
            print('blocks to go', latest_block_num - cur_block)
            with open('block_data.json', 'w') as f:
                json.dump(block_data, f)
                print("done writing to file")
        if cur_block not in block_data:
            block_data[cur_block] = get_block_time(cur_block)
        i += 1

    with open('block_data.json', 'w') as f:
        json.dump(block_data, f)

def analyze():
    # Run a monte Carlo Simulation
    block_data = load_cache()
    largest_block = max(block_data.keys())
    smallest_block = min(block_data.keys())
    blocks_to_go = TARGET_BLOCK - largest_block
    past_block_num = largest_block - blocks_to_go
    print('past 1', past_block_num)
    past_block_num = max(past_block_num, smallest_block)
    print('smallest block', smallest_block)

    print('blocks to go', blocks_to_go)

    times = []

    for cur_block_num in range(past_block_num + 1, largest_block + 1):
        diff = block_data[cur_block_num] - block_data[cur_block_num - 1]
        times.append(diff)

    def simulate():
        cur_time = block_data[largest_block]
        for i in range(blocks_to_go):
            cur_time += random.choice(times)
        return cur_time - TARGET_TIME

    def get_percentile(arr, percentile):
        return sorted(arr)[int(len(arr) * percentile)]


    def get_prob(percentile1, percentile2):
        ITERS = 1000
        success = 0
        hour_success = 0
        histogram = {}
        results = []
        for i in range(ITERS):
            result = simulate()
            print(result / 3600)
            results.append(result)
            if result > 0:
                success += 1
            hour = math.floor(result / 3600)
            histogram[hour] = histogram.get(hour, 0) + 1
        avg = sum(results) / len(results)
        return (success / ITERS,
                avg,
                get_percentile(results, percentile1),
                get_percentile(results, percentile2),
                histogram)

    return get_prob(0.25, 0.75)

def calculate_margin():
    latest_block_num = latest_block()
    blocks_to_go = TARGET_BLOCK - latest_block_num
    in_the_past_block = latest_block_num - blocks_to_go

    print_time(TARGET_TIME, 'target')

    latest_time = get_block_time(latest_block_num)
    past_block = get_block_time(in_the_past_block)
    print_time(latest_time, 'latest')
    print_time(past_block, 'past')

    time_diff = latest_time - past_block

    estimate = time_diff + latest_time

    print_time(estimate, 'estimate')

    margin = estimate - TARGET_TIME

    return margin

download_blocks()
percent_success, avg, p1, p2, histogram = analyze()
print('percent success', percent_success)
print('avg margin hours', avg / 3600)
print('25th percentile', p1 / 3600)
print('75th percentile', p2 / 3600)
for hour in sorted(histogram.keys()):
    print(hour, histogram[hour])

import requests
import time
import json
import random

NUM_SAMPLES = 10

TARGET_BLOCK = 12965000
TARGET_TIME_MIDNIGHT = 1628135999 #August 5th at midnight ET
TARGET_TIME_10am = 1628135999 + 3600 * 10
BNUM = 12900000 + 2048 * 2
BET_BLOCK = 12918600

def make_req(params):
    params = json.dumps(params)
    headers = {'Content-Type': 'application/json'}
    while True:
        try:
            r = requests.post('geth node', headers=headers, data=params)
            return json.loads(r.text)
        except Exception as e:
            print("EXCEPTION!!!")
            print(e)
            time.sleep(10)
            print("done sleeping")

def latest_block():
    params = {"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":83}
    res = make_req(params)
    return int(res['result'], 16)

def get_block_time(block_num):
    params = {"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":[hex(block_num), True],"id":1}
    res = make_req(params)
    return int(res['result']['timestamp'], 16)

def get_params_bet():
    latest = latest_block()
    latest_block_time = get_block_time(latest)
    blocks_past = latest - BNUM
    blocks_future = TARGET_BLOCK - latest
    num_bet_blocks = TARGET_BLOCK - BET_BLOCK
    target_time = get_block_time(BET_BLOCK) + 13.70 * num_bet_blocks
    avg_future = (target_time - latest_block_time) / blocks_future
    avg_past = (latest_block_time - get_block_time(BNUM)) / blocks_past
    return (blocks_past, blocks_future, avg_future, avg_past)

def get_params_10am():
    latest = latest_block()
    latest_block_time = get_block_time(latest)
    blocks_past = latest - BNUM
    blocks_future = TARGET_BLOCK - latest
    avg_future = (TARGET_TIME_10am - latest_block_time) / blocks_future
    avg_past = (latest_block_time - get_block_time(BNUM)) / blocks_past
    return (blocks_past, blocks_future, avg_future, avg_past)


def get_sample(target_avg):
    prob = 1 / target_avg
    n = 0
    while True:
        r = random.random()
        n += 1
        if r < prob:
            return n

def get_avg(func, param):
    result = []
    ITERS = 1000000
    for _ in range(ITERS):
        result.append(func(param))
    return sum(result) / len(result)

def get_runout(num_blocks, avg_time):
    time = 0
    for i in range(num_blocks):
        time += get_sample(avg_time)
    return time

def num_samples_in_range(target_avg, num_blocks, range_start, range_end, num_samples):
    result = 0
    for _ in range(num_samples):
        single_result = get_runout(num_blocks, target_avg) / num_blocks
        if range_start <= single_result < range_end:
            result += 1
    return result

def build_target_dist_inc(sample_avg, num_blocks, num_samples):
    gap = 0.001
    K = 20
    result = {}
    for i in range(-K, K + 1):
        target_avg = sample_avg + sample_avg * gap * i
        num = num_samples_in_range(
                target_avg,
                num_blocks,
                range_start=(sample_avg * (1 - gap)),
                range_end=(sample_avg * (1 + gap)),
                num_samples=num_samples)
        result[target_avg] = num
    return result

def print_dist(dist):
    for k in sorted(dist.keys()):
        print(k, dist[k])

def prob_faster(target_avg, num_blocks, needed_avg, num_samples):
    result = 0
    for _ in range(num_samples):
        single_avg = get_runout(num_blocks, target_avg) / num_blocks
        if single_avg < needed_avg:
            result += 1
    return (result, num_samples)

def dist_faster(distribution, num_blocks, needed_avg, num_samples):
    probs_faster = {}
    for target_avg in distribution:
        result = prob_faster(target_avg, num_blocks, needed_avg, num_samples)
        probs_faster[target_avg] = result
    return probs_faster

def build_target_dist(past_blocks, future_blocks, avg_past, avg_future):

    def combine(small, big):
        for k in small:
            big[k] = big.get(k, 0) + small[k]

    def combine_tuple(small, big):
        for k in small:
            inc1, total1 = small[k]
            inc2, total2 = big.get(k, (0, 0))
            big[k] = (inc1 + inc2, total1 + total2)

    def normalized(dist):
        total = sum(dist.values())
        result = {}
        for k in dist:
            result[k] = dist[k] / total
        return result

    def get_final_prob(norm, faster_dist):
        total = 0
        for k in norm:
            faster_success, faster_total = faster_dist[k]
            total += norm[k] * faster_success / faster_total
        return total

    def print_combined(norm, faster):
        print("equilibrium_time, probability_based_on_seen_data, probability_of_eip_king_winning")
        for k in sorted(norm.keys()):
            print(', '.join(str(x) for x in (k, norm[k], faster[k][0] / faster[k][1])))

    faster_dist = {}

    N = 1000
    num_samples = 10
    distribution = {}
    for _ in range(N):
        inc_dist = build_target_dist_inc(avg_past, past_blocks, num_samples)
        combine(inc_dist, distribution)
        norm = normalized(distribution)
        cur_faster = dist_faster(norm, future_blocks, avg_future, num_samples)
        combine_tuple(cur_faster, faster_dist)
        final_prob = get_final_prob(norm, faster_dist)
        print_combined(norm, faster_dist)
        print(f"Overall probability: {final_prob}")
        print('*' * 80)


params = get_params_10am()
blocks_past, blocks_future, avg_future, avg_past = params
print(params)

build_target_dist(blocks_past, blocks_future, avg_past, avg_future)