Source code for r2b2.simulation.minerva

import math
import random as r
from typing import List
from typing import Tuple

from r2b2.minerva import Minerva
from r2b2.simulator import Simulation
from r2b2.simulator import histogram


[docs]class MinervaOneRoundRisk(Simulation): """Simulate a 1-round Minerva audit for a given sample size to compute risk limit.""" sample_size: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, sample_size, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): super().__init__('minerva', alpha, reported, 'tie', True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.sample_size = sample_size self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots self.reported.contest_ballots = self.total_relevant_ballots self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_size < self.audit.min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit.') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) self.vote_dist = [(sorted_tally[0][0], self.total_relevant_ballots // 2)] for i in range(1, len(sorted_tally)): self.vote_dist.append((sorted_tally[i][0], self.total_relevant_ballots)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a 1-round minerva audit (using r2b2.minerva.Minerva)""" r.seed(seed) # Draw a sample of a given size sample = [0 for i in range(len(self.vote_dist))] for i in range(self.sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break relevant_sample_size = self.sample_size - sample[-1] # Perform audit computations self.audit._reset() self.audit.rounds.append(relevant_sample_size) self.audit.current_dist_null() self.audit.current_dist_reported() p_value = self.audit.compute_risk(sample[0], relevant_sample_size) if p_value <= self.alpha: stop = True else: stop = False return { 'stop': stop, 'p_value': p_value, 'sample_size': self.sample_size, 'relevant_sample_size': relevant_sample_size, 'winner_ballots': sample[0] }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyze trials to get experimental risk. Args: verbose (bool): If true, analyze will print simulation analysis information. hist (bool): If true, analyze will generate and display 2 histograms: winner ballots found in the sample size and computed risk. """ if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 total_risk = 0 total_relevant_sampled = 0 winner_ballot_dist = [] risk_dist = [] for trial in trials: num_trials += 1 if trial['stop']: stopped += 1 total_relevant_sampled += trial['relevant_sample_size'] winner_ballot_dist.append(trial['winner_ballots']) total_risk += trial['p_value'] risk_dist.append(trial['p_value']) if verbose: print('Analysis\n========') print('Underlying election is tied\n') print('Number of trials: {}'.format(num_trials)) print('Number of stopped: {}'.format(stopped)) print('Risk Limit: {:%}'.format(self.alpha)) print('Risk Computed: {:%}'.format(stopped / num_trials)) if hist: histogram(winner_ballot_dist, 'Winner ballots found in sample of size: {}'.format(self.sample_size)) histogram(risk_dist, 'Risk (p_value) dist.') # Update simulation entry to include analysis if self.db_mode: self.db.update_analysis(self.sim_id, (stopped / num_trials)) return stopped / num_trials
[docs]class MinervaOneRoundStoppingProb(Simulation): """Simulate a 1-round Minerva audit for a given sample size to compute stopping probability.""" sample_size: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, sample_size, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): super().__init__('minerva', alpha, reported, 'reported', True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.sample_size = sample_size self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots self.reported.contest_ballots = self.total_relevant_ballots self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_size < self.audit.min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) self.vote_dist = [(sorted_tally[0][0], sorted_tally[0][1])] current = sorted_tally[0][1] for i in range(1, len(sorted_tally)): current += sorted_tally[i][1] self.vote_dist.append((sorted_tally[i][0], current)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a 1-round minerva audit.""" r.seed(seed) # Draw a sample sample = [0 for i in range(len(self.vote_dist))] for i in range(self.sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break relevant_sample_size = self.sample_size - sample[-1] # Perform audit computations self.audit._reset() self.audit.rounds.append(relevant_sample_size) self.audit.current_dist_null() self.audit.current_dist_reported() p_value = self.audit.compute_risk(sample[0], relevant_sample_size) if p_value <= self.alpha: stop = True else: stop = False return { 'stop': stop, 'p_value': p_value, 'sample_size': self.sample_size, 'relevant_sample_size': relevant_sample_size, 'winner_ballots': sample[0] }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyse trials to get experimental stopping probability""" if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 winner_ballot_dist = [] risk_dist = [] for trial in trials: num_trials += 1 if trial['stop']: stopped += 1 winner_ballot_dist.append(trial['winner_ballots']) risk_dist.append(trial['p_value']) # TODO: insert verbose and histograms # Update simulation entry to include analysis if self.db_mode: self.db.update_analysis(self.sim_id, (stopped / num_trials)) return stopped / num_trials
[docs]class MinervaOneRoundAlteredMargin(Simulation): """Simulate a 1-round Minerva audit for a given sample size with a correct outcome but incorrect reported margin""" underlying_margin: float sample_size: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, underlying, underlying_margin, sample_size, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): super().__init__('minerva', alpha, reported, { 'change': underlying, 'margin': underlying_margin }, True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.underlying_margin = underlying_margin self.sample_size = sample_size self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots self.reported.contest_ballots = self.total_relevant_ballots self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_size < self.audit.min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) underlying_winner_prop = (1.0 + underlying_margin) / 2.0 self.vote_dist = [(sorted_tally[0][0], self.total_relevant_ballots * underlying_winner_prop)] # current = sorted_tally[0][1] # for i in range(1, len(sorted_tally)): # current += sorted_tally[i][1] # self.vote_dist.append((sorted_tally[i][0], current)) for i in range(1, len(sorted_tally)): self.vote_dist.append((sorted_tally[i][0], self.total_relevant_ballots)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a 1-round minerva audit.""" r.seed(seed) # Draw a sample sample = [0 for i in range(len(self.vote_dist))] for i in range(self.sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break relevant_sample_size = self.sample_size - sample[-1] # Perform audit computations self.audit._reset() self.audit.rounds.append(relevant_sample_size) self.audit.current_dist_null() self.audit.current_dist_reported() p_value = self.audit.compute_risk(sample[0], relevant_sample_size) if p_value <= self.alpha: stop = True else: stop = False return { 'stop': stop, 'p_value': p_value, 'sample_size': self.sample_size, 'relevant_sample_size': relevant_sample_size, 'winner_ballots': sample[0] }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyse trials to get experimental stopping probability""" if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 winner_ballot_dist = [] total_risk = 0.0 for trial in trials: num_trials += 1 total_risk += trial['p_value'] if trial['stop']: stopped += 1 winner_ballot_dist.append(trial['winner_ballots']) # TODO: insert verbose and histograms # Update simulation entry to include analysis if self.db_mode: analysis = {'avg_p_value': (total_risk / num_trials), 'sprob': (stopped / num_trials)} self.db.update_analysis(self.sim_id, analysis) return analysis
[docs]class MinervaMultiRoundStoppingProb(Simulation): """Simulate a multi-round Minerva audit. If sample_sprob is provided, sample sizes to achieve a sample_sprob probability of stopping will be computed and used. Otherwise, the initial sample size, sample_size, is given as input and further sample sizes are an additional (sample_mult) * (sample_size) ballots. The audit executes until it stops or reaches the maximum number of rounds. """ sample_sprob: float sample_size: int sample_mult: float max_rounds: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, max_rounds, sample_size=None, sample_mult=None, sample_sprob=None, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): # Add parameters to simulation DB entry if 'sim_args' in kwargs: kwargs['sim_args']['max_rounds'] = max_rounds kwargs['sim_args']['sample_mult'] = sample_mult kwargs['sim_args']['sample_sprob'] = sample_sprob else: kwargs['sim_args'] = {'max_rounds': max_rounds, 'sample_mult': sample_mult, 'sample_sprob': sample_sprob} super().__init__('minerva', alpha, reported, 'reported', True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.sample_sprob = sample_sprob self.sample_size = sample_size self.sample_mult = sample_mult self.max_rounds = max_rounds self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots # self.reported.contest_ballots = self.total_relevant_ballots # self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_sprob is None and sample_size is None and sample_mult is None: raise ValueError('Sample sizes cannot be chosen without sample_sprob or sample_size and sample_mult.') if sample_sprob is not None: if not sample_sprob > 0 or not sample_sprob < 1: raise ValueError('Sample size stopping probability is not between 0 and 1.') else: min_sample_size = 0 for pairwise_audit in self.audit.sub_audits.values(): min_sample_size = max(pairwise_audit.min_sample_size, min_sample_size) if sample_size < min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit.') if max_rounds < 2: raise ValueError('Maximum rounds is too small.') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) self.vote_dist = [(sorted_tally[0][0], sorted_tally[0][1])] current = sorted_tally[0][1] for i in range(1, len(sorted_tally)): current += sorted_tally[i][1] self.vote_dist.append((sorted_tally[i][0], current)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a multiround minerva audit (using r2b2.minerva.Minerva)""" r.seed(seed) # Ensure audit is reset self.audit._reset() # Initialize first round including initial sample size round_num = 1 previous_sample_size = 0 if self.sample_sprob is not None: current_sample_size = self.audit.next_sample_size(self.sample_sprob) else: current_sample_size = self.sample_size next_sample = math.ceil(self.sample_mult * self.sample_size) stop = False # For each round sample = [0 for i in range(len(self.vote_dist))] while round_num <= self.max_rounds: # Draw a sample of a given size if current_sample_size <= previous_sample_size: # TODO figure out when this happens and prevent it current_sample_size = previous_sample_size + 1 for i in range(current_sample_size - previous_sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break # Convert this sample to a dict sample_dict = {} for i in range(len(self.vote_dist)): # For now, we will ignore the irrelevant votes for this simulation if not self.vote_dist[i][0] == 'invalid': sample_dict[self.vote_dist[i][0]] = sample[i] # Execute a round of the audit for this sample stop = self.audit.execute_round(current_sample_size, sample_dict) # If audit is done, return trial output # FIXME: Improve output format if stop: return { 'stop': stop, 'round': round_num, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_ballots # 'kmin_sched': self.audit.min_winner_ballots } # Else choose a next round size and continue round_num += 1 previous_sample_size = current_sample_size if self.sample_sprob is not None: current_sample_size = self.audit.next_sample_size(self.sample_sprob) else: current_sample_size += next_sample next_sample = math.ceil(self.sample_mult * self.sample_size) # If audit does not stop, return trial output # FIXME: Improve output format return { 'stop': stop, 'round': self.max_rounds, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_ballots # 'kmin_sched': self.audit.min_winner_ballots }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyze trials to get experimental stopping probability. Args: verbose (bool): If true, analyze will print simulation analysis information. hist (bool): If true, analyze will generate and display 2 histograms: winner ballots found in the sample size and computed stopping probability. """ if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 rounds_stopped = [] # TODO: Create additinal structures to store trial data for trial in trials: num_trials += 1 if trial['stop']: stopped += 1 rounds_stopped.append(trial['round']) # TODO: Extract more data from trial if verbose: print('Analysis\n========\n') print('Number of trials: {}'.format(num_trials)) print('Experiemtnal Stopping Prob: {:.5f}'.format(stopped / num_trials)) if stopped > 0: print('Average Rounds in Stopped Trials: {:.2f}'.format(sum(rounds_stopped) / stopped)) if hist: histogram(rounds_stopped, 'Rounds reached in stopped trials.') # Find stopping probability for each round sprob_by_round = [0]*self.max_rounds stopped_by_round = [0]*self.max_rounds remaining_by_round = [0]*(self.max_rounds+1) # first round has all remaining remaining_by_round[0] = num_trials for rd in range(1, self.max_rounds+1): stopped_this_round = rounds_stopped.count(r) stopped_by_round[rd-1] = stopped_this_round if remaining_by_round[rd-1] != 0: sprob_by_round[rd-1] = stopped_this_round/remaining_by_round[rd-1] else: sprob_by_round[rd-1] = -1 remaining_by_round[rd] = remaining_by_round[rd-1]-stopped_this_round analysis = { 'sprob': stopped / num_trials, 'sprob_by_round': sprob_by_round, 'remaining_by_round': remaining_by_round, 'stopped_by_round': stopped_by_round } # Update simulation entry to include analysis if self.db_mode: self.db.update_analysis(self.sim_id, analysis) return analysis
[docs]class MinervaMultiRoundRisk(Simulation): """Simulate a multi-round Minerva audit. If sample_sprob is provided, sample sizes to achieve a sample_sprob probability of stopping will be computed and used. Otherwise, the initial sample size, x, is given as input and further sample sizes are an additional (sample_mult) * x ballots. The audit executes until it stops or reaches the maximum number of rounds. """ sample_sprob: float sample_size: int sample_mult: float max_rounds: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, max_rounds, sample_size=None, sample_mult=None, sample_sprob=None, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): # Add parameters to simulation DB entry if 'sim_args' in kwargs: kwargs['sim_args']['max_rounds'] = max_rounds kwargs['sim_args']['sample_mult'] = sample_mult kwargs['sim_args']['sample_sprob'] = sample_sprob else: kwargs['sim_args'] = {'max_rounds': max_rounds, 'sample_mult': sample_mult, 'sample_sprob': sample_sprob} super().__init__('minerva', alpha, reported, 'tie', True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.sample_size = sample_size self.sample_mult = sample_mult self.sample_sprob = sample_sprob self.max_rounds = max_rounds self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots # self.reported.contest_ballots = self.total_relevant_ballots # self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_sprob is None and sample_size is None and sample_mult is None: raise ValueError('Sample sizes cannot be chosen without sample_sprob or sample_size and sample_mult.') if sample_sprob is not None: if not sample_sprob > 0 or not sample_sprob < 1: raise ValueError('Sample size stopping probability is not between 0 and 1.') else: min_sample_size = 0 for pairwise_audit in self.audit.sub_audits.values(): min_sample_size = max(pairwise_audit.min_sample_size, min_sample_size) if sample_size < min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit.') if max_rounds < 2: raise ValueError('Maximum rounds is too small.') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution for a tied election sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) self.vote_dist = [(sorted_tally[0][0], self.total_relevant_ballots // 2)] for i in range(1, len(sorted_tally)): self.vote_dist.append((sorted_tally[i][0], self.total_relevant_ballots)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a multiround minerva audit (using r2b2.minerva.Minerva)""" r.seed(seed) # Ensure audit is reset self.audit._reset() # Initialize first round including initial sample size round_num = 1 previous_sample_size = 0 if self.sample_sprob is None: current_sample_size = self.sample_size next_sample = math.ceil(self.sample_mult * self.sample_size) stop = False # For each round sample = [0 for i in range(len(self.vote_dist))] while round_num <= self.max_rounds: if self.sample_sprob is not None: current_sample_size = self.audit.next_sample_size(self.sample_sprob) # Draw a sample of a given size for i in range(current_sample_size - previous_sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break # Convert this sample to a dict sample_dict = {} for i in range(len(self.vote_dist)): # For now, we will ignore the irrelevant ballots if not self.vote_dist[i][0] == 'invalid': sample_dict[self.vote_dist[i][0]] = sample[i] # Execute a round of the audit for this sample stop = self.audit.execute_round(current_sample_size, sample_dict) # If audit is done, return trial output # FIXME: Improve output format if stop: return { 'stop': stop, 'round': round_num, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_ballots, # 'kmin_sched': self.audit.min_winner_ballots } # Else choose a next round size and continue round_num += 1 previous_sample_size = current_sample_size if self.sample_sprob is None: current_sample_size += next_sample next_sample = math.ceil(self.sample_mult * self.sample_size) # If audit does not stop, return trial output # FIXME: Improve output format return { 'stop': stop, 'round': self.max_rounds, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_ballots, # 'kmin_sched': self.audit.min_winner_ballots }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyze trials to get experimental risk. Args: verbose (bool): If true, analyze will print simulation analysis information. hist (bool): If true, analyze will generate and display 2 histograms: winner ballots found in the sample size and computed risk. """ if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 rounds_stopped = [] # TODO: Create additinal structures to store trial data for trial in trials: num_trials += 1 if trial['stop']: stopped += 1 rounds_stopped.append(trial['round']) # TODO: Extract more data from trial if verbose: print('Analysis\n========\n') print('Number of trials: {}'.format(num_trials)) print('Experiemtnal Risk: {:.5f}'.format(stopped / num_trials)) if stopped > 0: print('Average Rounds in Stopped Trials: {:.2f}'.format(sum(rounds_stopped) / stopped)) if hist: histogram(rounds_stopped, 'Rounds reached in stopped trials.') # Find risk for each round risk_by_round = [0]*self.max_rounds stopped_by_round = [0]*self.max_rounds remaining_by_round = [0]*(self.max_rounds+1) # first round has all remaining remaining_by_round[0] = num_trials for rd in range(1, self.max_rounds + 1): stopped_this_round = rounds_stopped.count(rd) stopped_by_round[rd-1] = stopped_this_round if remaining_by_round[rd-1] != 0: risk_by_round[rd-1] = stopped_this_round/remaining_by_round[rd-1] else: risk_by_round[rd-1] = -1 remaining_by_round[rd] = remaining_by_round[rd-1]-stopped_this_round analysis = { 'risk': stopped / num_trials, 'risk_by_round': risk_by_round, 'remaining_by_round': remaining_by_round, 'stopped_by_round': stopped_by_round } # Update simulation entry to include analysis if self.db_mode: self.db.update_analysis(self.sim_id, analysis) return analysis
[docs]class MinervaRandomMultiRoundRisk(Simulation): """Simulate a multi-round Minerva audit for random subsequent sample sizes. The initial sample size, x, is given as input and further sample sizes are chosen randomly as an additioanl 0.5x to 1.5x ballots in the next round. The audit executes until it stops or reaches the maximum number of rounds. """ sample_size: int max_rounds: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, sample_size, max_rounds, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): if 'sim_args' in kwargs: kwargs['sim_args']['max_rounds'] = max_rounds else: kwargs['sim_args'] = {'max_rounds': max_rounds} super().__init__('minerva', alpha, reported, 'tie', True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.sample_size = sample_size self.max_rounds = max_rounds self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots self.reported.contest_ballots = self.total_relevant_ballots self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_size < self.audit.min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit.') if max_rounds < 2: raise ValueError('Maximum rounds is too small.') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution for a tied election sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) self.vote_dist = [(sorted_tally[0][0], self.total_relevant_ballots // 2)] for i in range(1, len(sorted_tally)): self.vote_dist.append((sorted_tally[i][0], self.total_relevant_ballots)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a 1-round minerva audit (using r2b2.minerva.Minerva)""" r.seed(seed) # Ensure audit is reset self.audit._reset() # Initialize first round with given initial sample size round_num = 1 previous_sample_size = 0 current_sample_size = self.sample_size stop = False # For each round sample = [0 for i in range(len(self.vote_dist))] while round_num <= self.max_rounds: # Draw a sample of a given size for i in range(current_sample_size - previous_sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break relevant_sample_size = current_sample_size - sample[-1] # Perform audit computations self.audit.rounds.append(relevant_sample_size) self.audit.current_dist_null() self.audit.current_dist_reported() # Check is audit has completed if (self.audit.stopping_condition(sample[0])): stop = True # Continue audit computations kmin = self.audit.next_min_winner_ballots(relevant_sample_size) self.audit.min_winner_ballots.append(kmin) self.audit.truncate_dist_null() self.audit.truncate_dist_reported() self.audit.sample_winner_ballots.append(sample[0]) # If audit is done, return trial output # FIXME: Improve output format if stop: return { 'stop': stop, 'round': round_num, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_winner_ballots, # 'kmin_sched': self.audit.min_winner_ballots } # Else choose a next round size and continue round_num += 1 sample_mult = r.uniform(0.5, 1.5) next_sample = math.ceil(self.sample_size * sample_mult) previous_sample_size = current_sample_size current_sample_size += next_sample # If audit does not stop, return trial output # FIXME: Improve output format return { 'stop': stop, 'round': self.max_rounds, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_winner_ballots, # 'kmin_sched': self.audit.min_winner_ballots }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyze trials to get experimental risk. Args: verbose (bool): If true, analyze will print simulation analysis information. hist (bool): If true, analyze will generate and display 2 histograms: winner ballots found in the sample size and computed risk. """ if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 rounds_stopped = [] # TODO: Create additinal structures to store trial data for trial in trials: num_trials += 1 if trial['stop']: stopped += 1 rounds_stopped.append(trial['round']) # TODO: Extract more data from trial if verbose: print('Analysis\n========\n') print('Number of trials: {}'.format(num_trials)) print('Experiemtnal Risk: {:.5f}'.format(stopped / num_trials)) if stopped > 0: print('Average Rounds in Stopped Trials: {:.2f}'.format(sum(rounds_stopped) / stopped)) if hist: histogram(rounds_stopped, 'Rounds reached in stopped trials.') # Update simulation entry to include analysis if self.db_mode: self.db.update_analysis(self.sim_id, (stopped / num_trials)) return stopped / num_trials
[docs]class MinervaRandomMultiRoundStoppingProb(Simulation): """Simulate a multi-round Minerva audit for random subsequent sample sizes. The initial sample size, x, is given as input and further sample sizes are chosen randomly as an additioanl 0.5x to 1.5x ballots in the next round. The audit executes until it stops or reaches the maximum number of rounds. """ sample_size: int max_rounds: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, sample_size, max_rounds, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): if 'sim_args' in kwargs: kwargs['sim_args']['max_rounds'] = max_rounds else: kwargs['sim_args'] = {'max_rounds': max_rounds} super().__init__('minerva', alpha, reported, 'reported', True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.sample_size = sample_size self.max_rounds = max_rounds self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots self.reported.contest_ballots = self.total_relevant_ballots self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_size < self.audit.min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit.') if max_rounds < 2: raise ValueError('Maximum rounds is too small.') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution for a tied election sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) self.vote_dist = [(sorted_tally[0][0], sorted_tally[0][1])] current = sorted_tally[0][1] for i in range(1, len(sorted_tally)): current += sorted_tally[i][1] self.vote_dist.append((sorted_tally[i][0], current)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a 1-round minerva audit (using r2b2.minerva.Minerva)""" r.seed(seed) # Ensure audit is reset self.audit._reset() # Initialize first round with given initial sample size round_num = 1 previous_sample_size = 0 current_sample_size = self.sample_size stop = False # For each round sample = [0 for i in range(len(self.vote_dist))] while round_num <= self.max_rounds: # Draw a sample of a given size for i in range(current_sample_size - previous_sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break relevant_sample_size = current_sample_size - sample[-1] # Perform audit computations self.audit.rounds.append(relevant_sample_size) self.audit.current_dist_null() self.audit.current_dist_reported() # Check is audit has completed if (self.audit.stopping_condition(sample[0])): stop = True # Continue audit computations kmin = self.audit.next_min_winner_ballots(relevant_sample_size) self.audit.min_winner_ballots.append(kmin) self.audit.truncate_dist_null() self.audit.truncate_dist_reported() self.audit.sample_winner_ballots.append(sample[0]) # If audit is done, return trial output # FIXME: Improve output format if stop: return { 'stop': stop, 'round': round_num, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_winner_ballots, 'kmin_sched': self.audit.min_winner_ballots } # Else choose a next round size and continue round_num += 1 sample_mult = r.uniform(0.5, 1.5) next_sample = math.ceil(self.sample_size * sample_mult) previous_sample_size = current_sample_size current_sample_size += next_sample # If audit does not stop, return trial output # FIXME: Improve output format return { 'stop': stop, 'round': self.max_rounds, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_winner_ballots, 'kmin_sched': self.audit.min_winner_ballots }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyze trials to get experimental risk. Args: verbose (bool): If true, analyze will print simulation analysis information. hist (bool): If true, analyze will generate and display 2 histograms: winner ballots found in the sample size and computed risk. """ if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 rounds_stopped = [] # TODO: Create additinal structures to store trial data for trial in trials: num_trials += 1 if trial['stop']: stopped += 1 rounds_stopped.append(trial['round']) # TODO: Extract more data from trial if verbose: print('Analysis\n========\n') print('Number of trials: {}'.format(num_trials)) print('Stopping Probability: {:%}'.format(stopped / num_trials)) if stopped > 0: print('Average Rounds in Stopped Trials: {:.2f}'.format(sum(rounds_stopped) / stopped)) if hist: histogram(rounds_stopped, 'Rounds reached in stopped trials.') # Update simulation entry to include analysis if self.db_mode: self.db.update_analysis(self.sim_id, (stopped / num_trials)) return stopped / num_trials
[docs]class MinervaMultiRoundAlteredMargin(Simulation): """Simulate a Minerva audit for a given sample size with a correct outcome but incorrect reported margin""" underlying_margin: float sample_size: int max_rounds: int total_relevant_ballots: int vote_dist: List[Tuple[str, int]] audit: Minerva def __init__(self, alpha, reported, underlying, underlying_margin, sample_size, max_rounds, db_mode=True, db_host='localhost', db_name='r2b2', db_port=27017, user='writer', pwd='icanwrite', *args, **kwargs): if 'sim_args' in kwargs: kwargs['sim__args']['max_rounds'] = max_rounds else: kwargs['sim_args'] = {'max_rounds': max_rounds} super().__init__('minerva', alpha, reported, { 'change': underlying, 'margin': underlying_margin }, True, db_mode, db_host, db_port, db_name, user, pwd, *args, **kwargs) self.underlying_margin = underlying_margin self.sample_size = sample_size self.max_rounds = max_rounds self.total_relevant_ballots = sum(self.reported.tally.values()) # FIXME: temporary until pairwise contest fix is implemented self.contest_ballots = self.reported.contest_ballots self.reported.contest_ballots = self.total_relevant_ballots self.reported.winner_prop = self.reported.tally[self.reported.reported_winners[0]] / self.reported.contest_ballots self.audit = Minerva(self.alpha, 1.0, self.reported) if sample_size < self.audit.min_sample_size: raise ValueError('Sample size is less than minimum sample size for audit') # FIXME: sorted candidate list will be created by new branch, update once merged # Generate a sorted underlying vote distribution sorted_tally = sorted(self.reported.tally.items(), key=lambda x: x[1], reverse=True) underlying_winner_prop = (1.0 + underlying_margin) / 2.0 self.vote_dist = [(sorted_tally[0][0], self.total_relevant_ballots * underlying_winner_prop)] for i in range(1, len(sorted_tally)): self.vote_dist.append((sorted_tally[i][0], self.total_relevant_ballots)) self.vote_dist.append(('invalid', self.contest_ballots))
[docs] def trial(self, seed): """Execute a multiround minerva audit.""" r.seed(seed) # Ensure audit is reset self.audit._reset() # Initialize first round with given initial sample size round_num = 1 previous_sample_size = 0 current_sample_size = self.sample_size stop = False # Draw a sample sample = [0 for i in range(len(self.vote_dist))] while round_num <= self.max_rounds: # Draw a sample for current size for i in range(current_sample_size - previous_sample_size): ballot = r.randint(1, self.contest_ballots) for j in range(len(sample)): if ballot <= self.vote_dist[j][1]: sample[j] += 1 break relevant_sample_size = current_sample_size - sample[-1] # Perform audit computations self.audit.rounds.append(relevant_sample_size) self.audit.current_dist_null() self.audit.current_dist_reported() # Check if audit is completed if (self.audit.stopping_condition(sample[0])): stop = True # Continue audit computations kmin = self.audit.next_min_winner_ballots(relevant_sample_size) self.audit.min_winner_ballots.append(kmin) self.audit.truncate_dist_null() self.audit.truncate_dist_reported() self.audit.sample_winner_ballots.append(sample[0]) # If audit is done, return trial output if stop: return { 'stop': stop, 'round': round_num, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_winner_ballots, 'kmin_sched': self.audit.min_winner_ballots } # Else choose a next round size and continue round_num += 1 sample_mult = r.uniform(0.5, 1.5) next_sample = math.ceil(self.sample_size * sample_mult) previous_sample_size = current_sample_size current_sample_size += next_sample # If audit does not stop, return trial output return { 'stop': stop, 'round': self.max_rounds, 'p_value_sched': self.audit.pvalue_schedule, 'p_value': self.audit.get_risk_level(), 'relevant_sample_size_sched': self.audit.rounds, 'winner_ballots_drawn_sched': self.audit.sample_winner_ballots, 'kmin_sched': self.audit.min_winner_ballots }
[docs] def analyze(self, verbose: bool = False, hist: bool = False): """Analyse trials to get experimental stopping probability""" if self.db_mode: trials = self.db.trial_lookup(self.sim_id) else: trials = self.trials num_trials = 0 stopped = 0 winner_ballot_dist = [] total_risk = 0.0 for trial in trials: num_trials += 1 total_risk += trial['p_value'] if trial['stop']: stopped += 1 winner_ballot_dist.append(trial['winner_ballots']) # TODO: insert verbose and histograms # Update simulation entry to include analysis if self.db_mode: analysis = {'avg_p_value': (total_risk / num_trials), 'sprob': (stopped / num_trials)} self.db.update_analysis(self.sim_id, analysis) return analysis