This is an analysis of the probability of the London hard fork happening after August 4th at 23:59 ET
Here's the output of the simulation as of July 14. In 99.7% of the simulations, the fork happens after midnight. The most common hour when it happens is between 3am and 4am, this happens 28.4% of the time. The 25th and 75th percentiles are roughtly between 2.87 and 4.68 hours past midnight.
percent success 0.99753 avg margin hours 3.773372597222222 25th percentile: 2.8722222222222222 75th percentile 4.680833333333333 -3 1 -2 20 -1 226 0 1729 1 7410 2 18782 3 28439 4 25417 5 13106 6 4050 7 726 8 87 9 7
The way it works is first we download all the blocks from the past month. It assumes that the block time distribution will remain the same in the future as in the past. We then use all the block times to do a monte Carlo simulation of when the actual fork will happen.
import datetime
import json
import math
import random
import requests
import time
TARGET_BLOCK = 12965000
TARGET_TIME = 1628135999 #August 5th at midnight ET
GETH_NODE = None #Change this
def make_req(params):
params = json.dumps(params)
headers = {'Content-Type': 'application/json'}
while True:
try:
r = requests.post(GETH_NODE, headers=headers, data=params)
return json.loads(r.text)
except Exception as e:
print("EXCEPTION!!!")
print(e)
time.sleep(10)
print("done sleeping")
def latest_block():
params = {"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":83}
res = make_req(params)
return int(res['result'], 16)
def get_block_time(block_num):
params = {"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":[hex(block_num), True],"id":1}
res = make_req(params)
return int(res['result']['timestamp'], 16)
def print_time(epoch, label):
if False:
return
e = datetime.datetime.utcfromtimestamp(epoch)
print(label, e)
def load_cache():
with open("block_data.json", 'r') as f:
data = json.load(f)
block_data = {}
for k in data:
block_data[int(k)] = data[k]
return block_data
def download_blocks():
print("starting download")
block_data = load_cache()
latest_block_num = latest_block()
blocks_to_go = TARGET_BLOCK - latest_block_num
past_block_num = latest_block_num - blocks_to_go
i = 0
for cur_block in range(past_block_num, latest_block_num):
if i % 100 == 0:
print(i)
if i % 10000 == 0:
print('blocks to go', latest_block_num - cur_block)
with open('block_data.json', 'w') as f:
json.dump(block_data, f)
print("done writing to file")
if cur_block not in block_data:
block_data[cur_block] = get_block_time(cur_block)
i += 1
with open('block_data.json', 'w') as f:
json.dump(block_data, f)
def analyze():
# Run a monte Carlo Simulation
block_data = load_cache()
largest_block = max(block_data.keys())
smallest_block = min(block_data.keys())
blocks_to_go = TARGET_BLOCK - largest_block
past_block_num = largest_block - blocks_to_go
print('past 1', past_block_num)
past_block_num = max(past_block_num, smallest_block)
print('smallest block', smallest_block)
print('blocks to go', blocks_to_go)
times = []
for cur_block_num in range(past_block_num + 1, largest_block + 1):
diff = block_data[cur_block_num] - block_data[cur_block_num - 1]
times.append(diff)
def simulate():
cur_time = block_data[largest_block]
for i in range(blocks_to_go):
cur_time += random.choice(times)
return cur_time - TARGET_TIME
def get_percentile(arr, percentile):
return sorted(arr)[int(len(arr) * percentile)]
def get_prob(percentile1, percentile2):
ITERS = 1000
success = 0
hour_success = 0
histogram = {}
results = []
for i in range(ITERS):
result = simulate()
print(result / 3600)
results.append(result)
if result > 0:
success += 1
hour = math.floor(result / 3600)
histogram[hour] = histogram.get(hour, 0) + 1
avg = sum(results) / len(results)
return (success / ITERS,
avg,
get_percentile(results, percentile1),
get_percentile(results, percentile2),
histogram)
return get_prob(0.25, 0.75)
def calculate_margin():
latest_block_num = latest_block()
blocks_to_go = TARGET_BLOCK - latest_block_num
in_the_past_block = latest_block_num - blocks_to_go
print_time(TARGET_TIME, 'target')
latest_time = get_block_time(latest_block_num)
past_block = get_block_time(in_the_past_block)
print_time(latest_time, 'latest')
print_time(past_block, 'past')
time_diff = latest_time - past_block
estimate = time_diff + latest_time
print_time(estimate, 'estimate')
margin = estimate - TARGET_TIME
return margin
download_blocks()
percent_success, avg, p1, p2, histogram = analyze()
print('percent success', percent_success)
print('avg margin hours', avg / 3600)
print('25th percentile', p1 / 3600)
print('75th percentile', p2 / 3600)
for hour in sorted(histogram.keys()):
print(hour, histogram[hour])
import requests
import time
import json
import random
NUM_SAMPLES = 10
TARGET_BLOCK = 12965000
TARGET_TIME_MIDNIGHT = 1628135999 #August 5th at midnight ET
TARGET_TIME_10am = 1628135999 + 3600 * 10
BNUM = 12900000 + 2048 * 2
BET_BLOCK = 12918600
def make_req(params):
params = json.dumps(params)
headers = {'Content-Type': 'application/json'}
while True:
try:
r = requests.post('geth node', headers=headers, data=params)
return json.loads(r.text)
except Exception as e:
print("EXCEPTION!!!")
print(e)
time.sleep(10)
print("done sleeping")
def latest_block():
params = {"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":83}
res = make_req(params)
return int(res['result'], 16)
def get_block_time(block_num):
params = {"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":[hex(block_num), True],"id":1}
res = make_req(params)
return int(res['result']['timestamp'], 16)
def get_params_bet():
latest = latest_block()
latest_block_time = get_block_time(latest)
blocks_past = latest - BNUM
blocks_future = TARGET_BLOCK - latest
num_bet_blocks = TARGET_BLOCK - BET_BLOCK
target_time = get_block_time(BET_BLOCK) + 13.70 * num_bet_blocks
avg_future = (target_time - latest_block_time) / blocks_future
avg_past = (latest_block_time - get_block_time(BNUM)) / blocks_past
return (blocks_past, blocks_future, avg_future, avg_past)
def get_params_10am():
latest = latest_block()
latest_block_time = get_block_time(latest)
blocks_past = latest - BNUM
blocks_future = TARGET_BLOCK - latest
avg_future = (TARGET_TIME_10am - latest_block_time) / blocks_future
avg_past = (latest_block_time - get_block_time(BNUM)) / blocks_past
return (blocks_past, blocks_future, avg_future, avg_past)
def get_sample(target_avg):
prob = 1 / target_avg
n = 0
while True:
r = random.random()
n += 1
if r < prob:
return n
def get_avg(func, param):
result = []
ITERS = 1000000
for _ in range(ITERS):
result.append(func(param))
return sum(result) / len(result)
def get_runout(num_blocks, avg_time):
time = 0
for i in range(num_blocks):
time += get_sample(avg_time)
return time
def num_samples_in_range(target_avg, num_blocks, range_start, range_end, num_samples):
result = 0
for _ in range(num_samples):
single_result = get_runout(num_blocks, target_avg) / num_blocks
if range_start <= single_result < range_end:
result += 1
return result
def build_target_dist_inc(sample_avg, num_blocks, num_samples):
gap = 0.001
K = 20
result = {}
for i in range(-K, K + 1):
target_avg = sample_avg + sample_avg * gap * i
num = num_samples_in_range(
target_avg,
num_blocks,
range_start=(sample_avg * (1 - gap)),
range_end=(sample_avg * (1 + gap)),
num_samples=num_samples)
result[target_avg] = num
return result
def print_dist(dist):
for k in sorted(dist.keys()):
print(k, dist[k])
def prob_faster(target_avg, num_blocks, needed_avg, num_samples):
result = 0
for _ in range(num_samples):
single_avg = get_runout(num_blocks, target_avg) / num_blocks
if single_avg < needed_avg:
result += 1
return (result, num_samples)
def dist_faster(distribution, num_blocks, needed_avg, num_samples):
probs_faster = {}
for target_avg in distribution:
result = prob_faster(target_avg, num_blocks, needed_avg, num_samples)
probs_faster[target_avg] = result
return probs_faster
def build_target_dist(past_blocks, future_blocks, avg_past, avg_future):
def combine(small, big):
for k in small:
big[k] = big.get(k, 0) + small[k]
def combine_tuple(small, big):
for k in small:
inc1, total1 = small[k]
inc2, total2 = big.get(k, (0, 0))
big[k] = (inc1 + inc2, total1 + total2)
def normalized(dist):
total = sum(dist.values())
result = {}
for k in dist:
result[k] = dist[k] / total
return result
def get_final_prob(norm, faster_dist):
total = 0
for k in norm:
faster_success, faster_total = faster_dist[k]
total += norm[k] * faster_success / faster_total
return total
def print_combined(norm, faster):
print("equilibrium_time, probability_based_on_seen_data, probability_of_eip_king_winning")
for k in sorted(norm.keys()):
print(', '.join(str(x) for x in (k, norm[k], faster[k][0] / faster[k][1])))
faster_dist = {}
N = 1000
num_samples = 10
distribution = {}
for _ in range(N):
inc_dist = build_target_dist_inc(avg_past, past_blocks, num_samples)
combine(inc_dist, distribution)
norm = normalized(distribution)
cur_faster = dist_faster(norm, future_blocks, avg_future, num_samples)
combine_tuple(cur_faster, faster_dist)
final_prob = get_final_prob(norm, faster_dist)
print_combined(norm, faster_dist)
print(f"Overall probability: {final_prob}")
print('*' * 80)
params = get_params_10am()
blocks_past, blocks_future, avg_future, avg_past = params
print(params)
build_target_dist(blocks_past, blocks_future, avg_past, avg_future)