Source code for r2b2.audit

"""Abstract module defining an Audit framework."""
import math
from abc import ABC
from abc import abstractmethod
from typing import Dict
from typing import List

import click
import numpy as np
from scipy.signal import convolve
from scipy.stats import binom
from scipy.stats import hypergeom

from r2b2.contest import Contest
from r2b2.contest import PairwiseContest


[docs]class PairwiseAudit(): """Store audit information for pairwise comparison. The PairwiseAudit class hold the audit data for a single pair of candidates. Attributes: sub_contest (PairwiseContest): Pairwise contest data for relevant pair of candidates. min_sample_size (int): The smallest valid sample size. The minimum round size where kmin <= round risk_schedule (List[float]): Schedule of risk assocaited with each previous round. Corresponds to tail of null distribution. stopping_prob_schedule (List[float]): Schedule of stopping probabilities associated with each previous round. Corresponds to tail of reported tally distribution. pvalue_schedule (List[float]): Schedule of pvalues associated with each previous round. Corresponds to ratio of risk and stopping probability. distribution_null (Dict[str, List[float]]): Current distribution associated with a tied election for each pairwise subcontest. distribution_reported_tally (Dict[str, List[float]]): Current distribution associated with reported tally for each pairwise subcontest. min_winner_ballots (List[int]): List of stopping sizes (kmin values) for each round. stopped (bool): Indicates if pairwise audit has stopped. """ sub_contest: PairwiseContest min_sample_size: int risk_schedule: List[float] stopping_prob_schedule: List[float] pvalue_schedule: List[float] distribution_null: List[float] distribution_reported_tally: List[float] min_winner_ballots: List[int] stopped: bool def __init__(self, sub_contest: PairwiseContest): self.sub_contest = sub_contest self.min_sample_size = 1 self.risk_schedule = [] self.stopping_prob_schedule = [] self.pvalue_schedule = [] self.distribution_null = [1.0] self.distribution_reported_tally = [1.0] self.min_winner_ballots = [] self.stopped = False
[docs] def __repr__(self): return 'winner: {}, loser: {}, min_sample_size: {}, risk_schedule: {}, sprob_schedule: {}, \ pvalue_schedule: {}, min_winner_ballots: {}, stopped: {}'.format(self.sub_contest.reported_winner, self.sub_contest.reported_loser, self.min_sample_size, self.risk_schedule, self.stopping_prob_schedule, self.pvalue_schedule, self.min_winner_ballots, self.stopped)
[docs] def __str__(self): title_str = 'Pairwise Audit\n--------------\n' sub_contest_str = 'Subcontest Winner: {}\n'.format(self.sub_contest.reported_winner) sub_contest_str += 'Subcontest Loser: {}\n'.format(self.sub_contest.reported_loser) min_sample_size_str = 'Minimum Sample Size: {}\n'.format(self.min_sample_size) risk_sched_str = 'Risk Schedule: {}\n'.format(self.risk_schedule) sprob_sched_str = 'Stopping Probability Schedule: {}\n'.format(self.stopping_prob_schedule) pval_str = 'p-Value Schedule: {}\n'.format(self.pvalue_schedule) min_win_ballot_str = 'Minimum Winner Ballots: {}\n'.format(self.min_winner_ballots) stop_str = 'Stopped: {}\n'.format(self.stopped) return (title_str + sub_contest_str + min_sample_size_str + risk_sched_str + sprob_sched_str + pval_str + min_win_ballot_str + stop_str + '\n')
def _reset(self): self.risk_schedule = [] self.stopping_prob_schedule = [] self.pvalue_schedule = [] self.distribution_null = [1.0] self.distribution_reported_tally = [1.0] self.min_winner_ballots = [] self.stopped = False
[docs] def get_pair_str(self): """Get winner-loser pair as string used as dictionary key in Audit.""" return self.sub_contest.reported_winner + '-' + self.sub_contest.reported_loser
[docs]class Audit(ABC): """Abstract Base Class to define a general Audit object type. The Audit class is an abstract base class which defines the general structure and properties of a risk-limiting audit. Individual RLAs are subclasses of the Audit class. Attributes: alpha (float): Risk limit. Alpha represents the chance that given an incorrectly called election, the audit will fail to go to a full recount. beta (float): the worst case chance of causing an unnecessary full recount. For many RLAs, beta will simply be set to 0 and will not appear to be a parameter. max_fraction_to_draw (float): The maximum total number of ballots auditors are willing to draw during the course of the audit. replacement (bool): Indicates if the audit sampling should be done with (true) or without (false) replacement. rounds (List[int]): List of round sizes (i.e. sample sizes). sample_ballots (Dict[str, List[int]]): Dictionary mapping candidates to sample counts drawn throughout audit. Sample counts are cumulative. pvalue_schedule (List[float]): Schedule of pvalues for overall audit in each round. In each round, the overall pvalue is the maximum pvalue of all subaudits. contest (Contest): Contest on which to run the audit. sub_audits (Dict[str, PairwiseAudit]): Dict of PairwiseAudits wthin audit indexed by loser. stopped (bool): Indicates if the audit has stopped (i.e. met the risk limit). """ alpha: float beta: float max_fraction_to_draw: float replacement: bool rounds: List[int] sample_winner_ballots: List[int] pvalue_schedule: List[float] contest: Contest sample_ballots: Dict[str, List[int]] sub_audits: Dict[str, PairwiseAudit] stopped: bool def __init__(self, alpha: float, beta: float, max_fraction_to_draw: float, replacement: bool, contest: Contest): """Create an instance of an Audit. Note: This should only be called when initializing a subclass as the Audit class is an abstract class. """ if type(alpha) is not float: raise TypeError('alpha must be a float.') if alpha < 0 or alpha > 1.0: raise ValueError('alpha value must be between 0 and 1.') if type(beta) is not float: raise TypeError('beta must be a float.') if beta < 0 or beta > 1.0: raise ValueError('beta must be between 0 and 1.') if type(max_fraction_to_draw) is not float: raise TypeError('max_fraction_to_draw must be a fraction (i.e. float).') if max_fraction_to_draw < 0 or max_fraction_to_draw > 1: raise ValueError('max_fraction_to_draw must be between 0 and 1') if type(replacement) is not bool: raise TypeError('replacement must be boolean.') if type(contest) is not Contest: raise TypeError('contest must be a Contest object') self.alpha = alpha self.beta = beta self.max_fraction_to_draw = max_fraction_to_draw self.replacement = replacement self.contest = contest self.rounds = [] self.sample_winner_ballots = [] self.pvalue_schedule = [] self.stopped = False self.sample_ballots = {} for candidate in self.contest.candidates: self.sample_ballots[candidate] = [] # Get pairwise subcontests for reported winners and create pairwise audits self.sub_audits = {} for sub_contest in self.contest.sub_contests: self.sub_audits[sub_contest.reported_winner + '-' + sub_contest.reported_loser] = PairwiseAudit(sub_contest)
[docs] def __repr__(self): """String representation of Audit class. Note: Can (and perhaps should) be overwritten in subclass. """ return '{}: [{}, {}, {}, {}, {}]'.format(self.__class__.__name__, self.alpha, self.beta, self.max_fraction_to_draw, self.replacement, repr(self.contest))
[docs] def __str__(self): """Human readable string (i.e. printable) representation of Audit class. Note: Can (and perhaps should) be overwritten in subclass. """ title_str = 'Audit\n-----\n' alpha_str = 'Alpha: {}\n'.format(self.alpha) beta_str = 'Beta: {}\n'.format(self.beta) max_frac_str = 'Maximum Fraction to Draw: {}\n'.format(self.max_fraction_to_draw) replacement_str = 'Replacement: {}\n\n'.format(self.replacement) return title_str + alpha_str + beta_str + max_frac_str + replacement_str + str(self.contest)
[docs] def current_dist_null(self): """Update distribution_null for each sub audit for current round.""" if len(self.rounds) == 0: raise Exception('No rounds exist.') # For each pairwise sub audit, update null distribution for sub_audit in self.sub_audits.values(): # Update pairwise distribution using pairwise sample total as round size self._current_dist_null_pairwise(sub_audit)
[docs] def _current_dist_null_pairwise(self, sub_audit: PairwiseAudit, bulk_use_round_size=False): """Update distribution_null for a single PairwiseAudit Args: sub_audit (PairwiseAudit): Pairwise subaudit for which to update distribution. bulk_use_round_size (bool): Optional argument used by bulk methods. Since the bulk methods do not sample ballots for candidates during the rounds, this flag simply uses the round schedule as the round draw (instead of the pairwise round draw) for updating the distribution. Default is False. """ pair = sub_audit.get_pair_str() # If not first round get marginal sample if bulk_use_round_size: if len(self.rounds) == 1: round_draw = self.rounds[0] else: round_draw = self.rounds[-1] - self.rounds[-2] elif len(self.rounds) == 1: if len(self.sample_ballots[sub_audit.sub_contest.reported_loser]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_loser]), sub_audit.sub_contest.reported_loser)) if len(self.sample_ballots[sub_audit.sub_contest.reported_winner]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_winner]), sub_audit.stopped.reported_winner)) round_draw = self.sample_ballots[sub_audit.sub_contest.reported_loser][0] + self.sample_ballots[ sub_audit.sub_contest.reported_winner][0] else: if len(self.sample_ballots[sub_audit.sub_contest.reported_loser]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_loser]), sub_audit.sub_contest.reported_loser)) if len(self.sample_ballots[sub_audit.sub_contest.reported_winner]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_winner]), sub_audit.stopped.reported_winner)) round_draw = (self.sample_ballots[sub_audit.sub_contest.reported_loser][-1] - self.sample_ballots[sub_audit.sub_contest.reported_loser][-2]) + ( self.sample_ballots[sub_audit.sub_contest.reported_winner][-1] - self.sample_ballots[sub_audit.sub_contest.reported_winner][-2]) # Distribution updating is dependent on sampling with or without replacement if self.replacement: distribution_round_draw = binom.pmf(range(0, round_draw + 1), round_draw, 0.5) # Compute convolution to get new distribution (except 1st round) if len(self.rounds) == 1: self.sub_audits[pair].distribution_null = distribution_round_draw else: self.sub_audits[pair].distribution_null = convolve(self.sub_audits[pair].distribution_null, distribution_round_draw, method='direct') self.sub_audits[pair].distribution_null = [abs(p) for p in self.sub_audits[pair].distribution_null] else: half_contest_ballots = math.floor(sub_audit.sub_contest.contest_ballots / 2) if len(self.rounds) == 1: # Simply compute hypergeometric for 1st round distribution self.sub_audits[pair].distribution_null = hypergeom.pmf(np.arange(round_draw + 1), sub_audit.sub_contest.contest_ballots, half_contest_ballots, round_draw) else: distribution_round_draw = [0 for i in range(self.rounds[-1] + 1)] # Get relevant interval of previous round distribution interval = self.__get_interval(self.sub_audits[pair].distribution_null) # For every possible number of winner ballots in previous rounds # and every possibility in the current round # compute probability of their simultaneity for prev_round_possibility in range(interval[0], interval[1] + 1): unsampled_contest_ballots = sub_audit.sub_contest.contest_ballots - self.rounds[-2] unsampled_winner_ballots = half_contest_ballots - prev_round_possibility curr_round_draw = hypergeom.pmf(np.arange(round_draw + 1), unsampled_contest_ballots, unsampled_winner_ballots, round_draw) for curr_round_possibility in range(round_draw + 1): component_prob = sub_audit.distribution_null[prev_round_possibility] * curr_round_draw[curr_round_possibility] distribution_round_draw[prev_round_possibility + curr_round_possibility] += component_prob self.sub_audits[pair].distribution_null = distribution_round_draw
[docs] def current_dist_reported(self): """Update distribution_reported_tally for each subaudit for current round.""" if len(self.rounds) == 0: raise Exception('No rounds exist') # For each pairwise sub audit, update dist_reported for sub_audit in self.sub_audits.values(): # Update distr_reported using pairwise round size self._current_dist_reported_pairwise(sub_audit)
[docs] def _current_dist_reported_pairwise(self, sub_audit: PairwiseAudit, bulk_use_round_size=False): """Update dist_reported for a single PairwiseAudit. Args: sub_audit (PairwiseAudit): Pairwise subaudit for which to update distriution. bulk_use_round_size (bool): Optional argument used by bulk methods. Since the bulk methods do not sample ballots for candidates during the rounds, this flag simply uses the round schedule as the round draw (instead of the pairwise round draw) for updating the distribution. Default is False. """ pair = sub_audit.get_pair_str() # If not first round get marginal sample if bulk_use_round_size: if len(self.rounds) == 1: round_draw = self.rounds[0] else: round_draw = self.rounds[-1] - self.rounds[-2] elif len(self.rounds) == 1: if len(self.sample_ballots[sub_audit.sub_contest.reported_loser]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_loser]), sub_audit.sub_contest.reported_loser)) if len(self.sample_ballots[sub_audit.sub_contest.reported_winner]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_winner]), sub_audit.stopped.reported_winner)) round_draw = self.sample_ballots[sub_audit.sub_contest.reported_loser][0] + self.sample_ballots[ sub_audit.sub_contest.reported_winner][0] else: if len(self.sample_ballots[sub_audit.sub_contest.reported_loser]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_loser]), sub_audit.sub_contest.reported_loser)) if len(self.sample_ballots[sub_audit.sub_contest.reported_winner]) != len(self.rounds): raise Exception('Currently {} rounds, but only {} samples for {}.'.format( len(self.rounds), len(self.sample_ballots[sub_audit.sub_contest.reported_winner]), sub_audit.stopped.reported_winner)) round_draw = (self.sample_ballots[sub_audit.sub_contest.reported_loser][-1] - self.sample_ballots[sub_audit.sub_contest.reported_loser][-2]) + ( self.sample_ballots[sub_audit.sub_contest.reported_winner][-1] - self.sample_ballots[sub_audit.sub_contest.reported_winner][-2]) if self.replacement: distribution_round_draw = binom.pmf(range(0, round_draw + 1), round_draw, sub_audit.sub_contest.winner_prop) if len(self.rounds) == 1: self.sub_audits[pair].distribution_reported_tally = distribution_round_draw else: self.sub_audits[pair].distribution_reported_tally = convolve(sub_audit.distribution_reported_tally, distribution_round_draw, method='direct') self.sub_audits[pair].distribution_reported_tally = [abs(p) for p in self.sub_audits[pair].distribution_reported_tally] else: reported_winner_ballots = int(sub_audit.sub_contest.winner_prop * sub_audit.sub_contest.contest_ballots) if len(self.rounds) == 1: # Simply compute hypergeometric for 1st round distribution self.sub_audits[pair].distribution_reported_tally = hypergeom.pmf(np.arange(round_draw + 1), sub_audit.sub_contest.contest_ballots, reported_winner_ballots, round_draw) else: distribution_round_draw = [0 for i in range(self.rounds[-1] + 1)] # Get relevant interval of previous round distribution interval = self.__get_interval(sub_audit.distribution_reported_tally) # For every possible number of winner ballots in previous rounds # and every possibility in the current round # compute probability of their simultaneity for prev_round_possibility in range(interval[0], interval[1] + 1): unsampled_contest_ballots = self.sub_audits[pair].sub_contest.contest_ballots - self.rounds[-2] unsampled_winner_ballots = reported_winner_ballots - prev_round_possibility curr_round_draw = hypergeom.pmf(np.arange(round_draw + 1), unsampled_contest_ballots, unsampled_winner_ballots, round_draw) for curr_round_possibility in range(round_draw + 1): component_prob = self.sub_audits[pair].distribution_reported_tally[prev_round_possibility] * curr_round_draw[ curr_round_possibility] distribution_round_draw[prev_round_possibility + curr_round_possibility] += component_prob self.sub_audits[pair].distribution_reported_tally = distribution_round_draw
[docs] def truncate_dist_null(self): """Update risk schedule and truncate null distribution for each subaudit.""" for pair in self.sub_audits.keys(): self._truncate_dist_null_pairwise(pair)
[docs] def _truncate_dist_null_pairwise(self, pair: str): """Update risk schedule and truncate null distribution for a single subaudit. Args: pair (str): Dictionary key for subaudit (within the audit's subaudits) to truncate distribution and update risk schedule. """ self.sub_audits[pair].risk_schedule.append( sum(self.sub_audits[pair].distribution_null[self.sub_audits[pair].min_winner_ballots[-1]:])) self.sub_audits[pair].distribution_null = self.sub_audits[pair].distribution_null[:self.sub_audits[pair].min_winner_ballots[-1]]
[docs] def truncate_dist_reported(self): """Update stopping prob schedule and truncate reported distribution for each subaudit.""" for pair in self.sub_audits.keys(): self._truncate_dist_reported_pairwise(pair)
[docs] def _truncate_dist_reported_pairwise(self, pair): """Update stopping prob schedule and truncate reported distribution for a single subaudit. Args: pair (str): Dictionary key for subaudit (within the audit's subaudits) to truncate distribution and update stopping prob schedule. """ self.sub_audits[pair].stopping_prob_schedule.append( sum(self.sub_audits[pair].distribution_reported_tally[self.sub_audits[pair].min_winner_ballots[-1]:])) self.sub_audits[pair].distribution_reported_tally = self.sub_audits[pair].distribution_reported_tally[:self.sub_audits[pair]. min_winner_ballots[-1]]
def __get_interval(self, dist: List[float]): """Get relevant interval [l, u] of given distribution. Find levels l and u such that cdf(l) < tolerance and 1 - cdf(u) < tolerance. The purpose of this is to improve efficiency in the current_dist_* functions for audits without replacement where almost all of the hypergeometric distribution falls in a fraction of its range, i.e. between l and u. Note: cdf() in this context does not require cdf(infinity) = 1, although the distribution should sum very closely to 1. """ tolerance = 0.0000001 interval = [0, len(dist) - 1] lower_sum = 0 upper_sum = 0 # Handle the edge case of a small distribution if sum(dist) < 2 * tolerance: return interval for i in range(len(dist) - 1): lower_sum += dist[i] if (lower_sum + dist[i + 1]) > tolerance: interval[0] = i break for i in range(len(dist) - 1, 0, -1): upper_sum += dist[i] if (upper_sum + dist[i - 1]) > tolerance: interval[1] = i break return interval
[docs] def asn(self, pair: str): """Compute ASN as described in BRAVO paper for pair of candidates. Given the fractional margin for the reported winner and the risk limit (alpha) produce the average number of ballots sampled during the audit. Args: pair (str): Dictionary key referencing pairwise audit in audit's subaudits. Returns: int: ASN value. """ winner_prop = self.sub_audits[pair].sub_contest.winner_prop loser_prop = 1.0 - winner_prop margin = (2 * winner_prop) - 1 z_w = math.log(margin + 1) z_l = math.log(1 - margin) top = (math.log(1.0 / self.alpha) + (z_w / 2.0)) bottom = (winner_prop * z_w) + (loser_prop * z_l) return math.ceil(top / bottom)
[docs] def execute_round(self, sample_size: int, sample: dict, verbose: bool = False) -> bool: """Execute a single, non-interactive audit round. Executes 1 round of the audit, given its current state. If the audit is stopped, its state will not be modified. Warning: This method assumes the audit is in the correct state to be executed. If multiple executions of a full audit will be run the same audit object, make sure to call reset on the audit object between full executions. Args: sample_size (int): Total ballots sampled by the end of this round (cumulative). sample (dict): Sample counts for each candidate by the end of this round (cumulative). Returns: bool: True if the audit met its stopping condition by this round. """ if self.stopped: if verbose: click.echo('Audit already met the stopping condition in a previous round.') return True if len(self.rounds) > 0 and sample_size <= self.rounds[-1]: raise ValueError('Invlaid sample size, must be larger than previous round.') if len(self.rounds) > 0: for candidate, tally in sample.items(): if tally < self.sample_ballots[candidate][-1]: raise ValueError('Invalid sample count. Candidate {}\'s sample tally cannot decrease.'.format(candidate)) if len(self.sample_ballots[candidate]) != len(self.rounds): raise Exception('There are currently {} rounds, but only {} sample tallys for candidate {}.'.format( len(self.rounds), len(self.sample_ballots[candidate]), candidate)) if len(sample) != self.contest.num_candidates: raise Exception('Sample must include tally for all candidates.') self.rounds.append(sample_size) for candidate, tally in sample.items(): self.sample_ballots[candidate].append(tally) self.current_dist_null() self.current_dist_reported() self.stopped = self.stopping_condition(verbose) if self.stopped: if verbose: click.echo('Audit had met stopping condition') return True self.next_min_winner_ballots(verbose) self.truncate_dist_null() self.truncate_dist_reported() return False
[docs] def run(self, verbose: bool = False): """Begin interactive audit execution. Begins the interactive version of the audit. While computations for different audits will vary, the process for executing each one is the same. This provides a process for selecting a sample size, determining if the ballots found for the reported winner in that sample size meet the stopping condition(s), and if not continuing with the audit. As the audit proceeds, data including round sizes, ballots for the winner in each round size, and per round risk and stopping probability are stored. """ self._reset() click.echo('\n==================\nBeginning Audit...\n==================\n') # FIXME: no overall minimum sample size exists, so max of all sub audit mins used sample_size = max(sub_audit.min_sample_size for sub_audit in self.sub_audits.values()) max_sample_size = self.contest.contest_ballots * self.max_fraction_to_draw prev_sample_size = 0 curr_round = 0 while sample_size < max_sample_size: curr_round += 1 print('\n----------\n{:^10}\n----------\n'.format('Round {}'.format(curr_round))) if verbose and curr_round > 1: click.echo('\n+--------------------------------------------------+') click.echo('|{:^50}|'.format('Audit Statistics')) click.echo('|{:50}|'.format(' ')) # FIXME: no more notion of overall audit minimum sample size # click.echo('|{:<50}|'.format('Minimum Sample Size: {}'.format(self.min_sample_size))) click.echo('|{:<30}{:<20}|'.format('Maximum Sample Size: ', max_sample_size)) click.echo('|{:<30}{:<20}|'.format('Risk-Limit: ', self.alpha)) click.echo('|{:<30}{:<20.12f}|'.format('Current Risk Level: ', self.get_risk_level())) click.echo('|{:50}|'.format(' ')) click.echo('|{:^24}|{:^25}|'.format('Round', 'p-value')) click.echo('|------------------------|-------------------------|') for r in range(1, curr_round): click.echo('|{:^24}|{:^25}|'.format(r, '{:.12f}'.format(self.pvalue_schedule[r - 1]))) click.echo('+--------------------------------------------------+') # Get next round sample size given desired stopping probability while click.confirm('Would you like to enter a desired stopping probability for this round?'): desired_sprob = click.prompt('Enter desired stopping probability for this round (.9 recommended)', type=click.FloatRange(0, 1)) next_sample_size = self.next_sample_size(desired_sprob) click.echo('{:<50}'.format('Recommended next sample size: {}'.format(next_sample_size))) if curr_round > 1: prev_sample_size = sample_size sample_size = click.prompt('Enter next sample size (as a running total)', type=click.IntRange(prev_sample_size + 1, max_sample_size)) self.rounds.append(sample_size) # Get sample counts for each candidate drawn in this round sample_size_remaining = sample_size for candidate in self.contest.candidates: if curr_round == 1: previous_votes_for_candidate = 0 else: previous_votes_for_candidate = self.sample_ballots[candidate][-1] if candidate in self.contest.reported_winners: votes_for_candidate = click.prompt( 'Enter total number of votes for {} (reported winner) found in sample'.format(candidate), type=click.IntRange(previous_votes_for_candidate, previous_votes_for_candidate + sample_size_remaining)) else: votes_for_candidate = click.prompt('Enter total number of votes for {} found in sample'.format(candidate), type=click.IntRange(previous_votes_for_candidate, previous_votes_for_candidate + sample_size_remaining)) self.sample_ballots[candidate].append(votes_for_candidate) sample_size_remaining -= votes_for_candidate # Update all pairwise distributions self.current_dist_null() self.current_dist_reported() # Evaluate Stopping Condition self.stopped = self.stopping_condition(verbose) click.echo('\n\n+----------------------------------------+') click.echo('|{:^40}|'.format('Stopping Condition Met? {}'.format(self.stopped))) click.echo('+----------------------------------------+') # Determine if the audit should proceed if self.stopped: click.echo('\n\nAudit Complete.') return elif click.confirm('\nWould you like to force stop the audit'): click.echo('\n\nAudit Complete: User stopped.') return # Compute kmin if audit has not stopped and truncate distributions self.next_min_winner_ballots(verbose) self.truncate_dist_null() self.truncate_dist_reported() click.echo('\n\nAudit Complete: Reached max sample size.')
[docs] def _reset(self): """Reset attributes modified during run().""" self.rounds = [] self.sample_winner_ballots = [] self.pvalue_schedule = [] for loser in self.sub_audits.keys(): self.sub_audits[loser]._reset() self.sample_ballots = {} for candidate in self.contest.candidates: self.sample_ballots[candidate] = [] self.stopped = False
[docs] @abstractmethod def get_min_sample_size(self, sub_audit: PairwiseAudit): """Get the minimum valid sample size in a sub audit Args: sub_audit (PairwiseAudit): Get minimum sample size for this sub_audit. """ pass
[docs] @abstractmethod def next_sample_size(self, *args, **kwargs): """Generate estimates of possible next sample sizes. Note: To be used during live/interactive audit execution. """ pass
[docs] def stopping_condition(self, verbose: bool = False) -> bool: """Determine if the audits stopping condition has been met. The audit stopping condition is met if and only if each pairwise stopping condition is met. """ stop = True max_pvalue = 0 for pair in self.sub_audits.keys(): # If pairwise audit has already stopped, do not compute round if self.sub_audits[pair].stopped: continue # Evaluate stopping condition for pairwise audit if not self.stopping_condition_pairwise(pair, verbose): stop = False if self.sub_audits[pair].pvalue_schedule[-1] > max_pvalue: max_pvalue = self.sub_audits[pair].pvalue_schedule[-1] self.pvalue_schedule.append(max_pvalue) if verbose: click.echo('\nRisk Level: {}'.format(self.get_risk_level())) return stop
[docs] @abstractmethod def stopping_condition_pairwise(self, pair: str, verbose: bool = False) -> bool: """Determine if pairwise subcontest meets stopping condition. Args: pair (str): Dictionary key referencing pairwise audit in audit's sub_audits. Returns: bool: If the pairwise subaudit has stopped. """ pass
[docs] def next_min_winner_ballots(self, verbose: bool = False): """Compute next stopping size of given (actual) sample sizes for all subaudits.""" if verbose: click.echo('\n+----------------------------------------+') click.echo('|{:^40}|'.format('Minimum Winner Ballots Needed in Round')) click.echo('|{:^40}|'.format('--------------------------------------')) for pair, sub_audit in self.sub_audits.items(): kmin = self.next_min_winner_ballots_pairwise(sub_audit) self.sub_audits[pair].min_winner_ballots.append(kmin) if verbose: if kmin is None: kmin = 'Inf.' click.echo('|{:<30} {:<9}|'.format(pair, kmin)) if verbose: click.echo('\n+----------------------------------------+')
[docs] @abstractmethod def next_min_winner_ballots_pairwise(self, sub_audit: PairwiseAudit) -> int: """Compute next stopping size of given (actual) sample size for a subaudit. Args: sub_audit (PairwiseAudit): Compute next stopping size for this subaudit. Note: To be used during live/interactive audit execution. """ pass
[docs] @abstractmethod def compute_min_winner_ballots(self, sub_audit: PairwiseAudit, progress: bool = False, *args, **kwargs): """Compute the stopping size(s) for any number of sample sizes for a given subaudit.""" pass
[docs] @abstractmethod def compute_all_min_winner_ballots(self, sub_audit: PairwiseAudit, progress: bool = False, *args, **kwargs): """Compute all stopping sizes from the minimum sample size on for a given subaudit.""" pass
[docs] @abstractmethod def compute_risk(self, sub_audit: PairwiseAudit, *args, **kwargs): """Compute the current risk level of a given subaudit. Returns: Current risk level of the audit (as defined per audit implementation). """ pass
[docs] @abstractmethod def get_risk_level(self, *args, **kwargs): """Find the risk level of the audit, that is, the smallest risk limit for which the audit as it has panned out would have already stopped. Returns: float: Risk level of the realization of the audit. """ pass