Source code for openqaoa.algorithms.qaoa.qaoa_result

from __future__ import annotations
from typing import Type, List, Union, TYPE_CHECKING

import copy
import numpy as np
import matplotlib.pyplot as plt

from ...qaoa_components import Hamiltonian

if TYPE_CHECKING:
    from ...optimizers.logger_vqa import Logger
from ...utilities import (
    qaoa_probabilities,
    bitstring_energy,
    convert2serialize,
    delete_keys_from_dict,
)
from ...backends.basebackend import QAOABaseBackend, QAOABaseBackendStatevector
from ...backends.qaoa_analytical_sim import QAOABackendAnalyticalSimulator


def most_probable_bitstring(cost_hamiltonian, measurement_outcomes):
    """
    Computing the most probable bitstring
    """
    mea_out = list(measurement_outcomes.values())
    index_likliest_states = np.argwhere(mea_out == np.max(mea_out))
    # degeneracy = len(index_likliest_states)
    solutions_bitstrings = [
        list(measurement_outcomes.keys())[e[0]] for e in index_likliest_states
    ]

    return {
        "solutions_bitstrings": solutions_bitstrings,
        "bitstring_energy": bitstring_energy(cost_hamiltonian, solutions_bitstrings[0]),
    }


[docs]class QAOAResult: """ A class to handle the results of QAOA workflows Parameters ---------- log: `Logger` The raw logger generated from the training vqa part of the QAOA. method: `str` Stores the name of the optimisation used by the classical optimiser cost_hamiltonian: `Hamiltonian` The cost Hamiltonian for the problem statement type_backend: `QAOABaseBackend` The type of backend used for the experiment """ def __init__( self, log: Logger, method: Type[str], cost_hamiltonian: Type[Hamiltonian], type_backend: Type[QAOABaseBackend], ): """ init method """ self.__type_backend = type_backend self.method = method self.cost_hamiltonian = cost_hamiltonian self.evals = { "number_of_evals": log.func_evals.best[0], "jac_evals": log.jac_func_evals.best[0], "qfim_evals": log.qfim_func_evals.best[0], } self.intermediate = { "angles": np.array(log.param_log.history).tolist(), "cost": log.cost.history, "measurement_outcomes": log.measurement_outcomes.history, "job_id": log.job_ids.history, } self.optimized = { "angles": np.array(log.param_log.best[0]).tolist() if log.param_log.best != [] else [], "cost": log.cost.best[0] if log.cost.best != [] else None, "measurement_outcomes": log.measurement_outcomes.best[0] if log.measurement_outcomes.best != [] else {}, "job_id": log.job_ids.best[0] if len(log.job_ids.best) != 0 else [], "eval_number": log.eval_number.best[0] if len(log.eval_number.best) != 0 else [], } self.most_probable_states = ( most_probable_bitstring( cost_hamiltonian, self.get_counts(log.measurement_outcomes.best[0]) ) if type_backend != QAOABackendAnalyticalSimulator and log.measurement_outcomes.best != [] else [] ) # def __repr__(self): # """Return an overview over the parameters and hyperparameters # Todo # ---- # Split this into ``__repr__`` and ``__str__`` with a more verbose # output in ``__repr__``. # """ # string = "Optimization Results:\n" # string += "\tThe solution is " + str(self.solution['degeneracy']) + " degenerate" "\n" # string += "\tThe most probable bitstrings are: " + str(self.solution['bitstring']) + "\n" # string += "\tThe associated cost is: " + str(self.optimized['cost']) + "\n" # return (string) # if we are using a shot adaptive optimizer, we need to add the number of shots to the result if log.n_shots.history != []: self.n_shots = log.n_shots.history
[docs] def asdict( self, keep_cost_hamiltonian: bool = True, complex_to_string: bool = False, intermediate_measurements: bool = True, exclude_keys: List[str] = [], ): """ Returns a dictionary with the results of the optimization, where the dictionary is serializable. If the backend is a statevector backend, the measurement outcomes will be the statevector, meaning that it is a list of complex numbers, which is not serializable. If that is the case, and complex_to_string is true the complex numbers are converted to strings. Parameters ---------- keep_cost_hamiltonian: `bool` If True, the cost hamiltonian is kept in the dictionary. If False, it is removed. complex_to_string: `bool` If True, the complex numbers are converted to strings. If False, they are kept as complex numbers. This is useful for the JSON serialization. intermediate_measurements: bool, optional If True, intermediate measurements are included in the dump. If False, intermediate measurements are not included in the dump. Default is True. exclude_keys: `list[str]` A list of keys to exclude from the returned dictionary. Returns ------- return_dict: `dict` A dictionary with the results of the optimization, where the dictionary is serializable. """ return_dict = {} return_dict["method"] = self.method if keep_cost_hamiltonian: return_dict["cost_hamiltonian"] = convert2serialize(self.cost_hamiltonian) return_dict["evals"] = self.evals return_dict["most_probable_states"] = self.most_probable_states complex_to_str = ( lambda x: str(x) if isinstance(x, np.complex128) or isinstance(x, complex) else x ) # if the backend is a statevector backend, the measurement outcomes will be the statevector, # meaning that it is a list of complex numbers, which is not serializable. # If that is the case, and complex_to_string is true the complex numbers are converted to strings. if complex_to_string: return_dict["intermediate"] = {} for key, value in self.intermediate.items(): # Measurements and Cost may require casting if "measurement" in key: if len(value) > 0: if intermediate_measurements is False: # if intermediate_measurements is false, the intermediate measurements are not included return_dict["intermediate"][key] = [] elif isinstance( value[0], np.ndarray ): # Statevector -> convert complex to str return_dict["intermediate"][key] = [ [complex_to_str(item) for item in list_] for list_ in value if ( isinstance(list_, list) or isinstance(list_, np.ndarray) ) ] else: # All other case -> cast numpy into return_dict["intermediate"][key] = [ {k_: int(v_) for k_, v_ in v.items()} for v in value ] else: pass elif "cost" == key and ( isinstance(value[0], np.float64) or isinstance(value[0], np.float32) ): return_dict["intermediate"][key] = [float(item) for item in value] else: return_dict["intermediate"][key] = value return_dict["optimized"] = {} for key, value in self.optimized.items(): # If wavefunction do complex to str if "measurement" in key and ( isinstance(value, list) or isinstance(value, np.ndarray) ): return_dict["optimized"][key] = [ complex_to_str(item) for item in value ] # if dictionary, convert measurement values to integers elif "measurement" in key and (isinstance(value, dict)): return_dict["optimized"][key] = { k: int(v) for k, v in value.items() } else: return_dict["optimized"][key] = value if "cost" in key and ( isinstance(value, np.float64) or isinstance(value, np.float32) ): return_dict["optimized"][key] = float(value) else: return_dict["intermediate"] = self.intermediate return_dict["optimized"] = self.optimized # if we are using a shot adaptive optimizer, we need to add the number of shots to the result, # so if attribute n_shots is not empty, it is added to the dictionary if getattr(self, "n_shots", None) is not None: return_dict["n_shots"] = self.n_shots return ( return_dict if exclude_keys == [] else delete_keys_from_dict(return_dict, exclude_keys) )
[docs] @classmethod def from_dict( cls, dictionary: dict, cost_hamiltonian: Union[Hamiltonian, None] = None ): """ Creates a Results object from a dictionary (which is the output of the asdict method) Parameters ---------- dictionary: `dict` The dictionary to create the QAOA Result object from Returns ------- `Result` The Result object created from the dictionary """ # deepcopy the dictionary, so that the original dictionary is not changed dictionary = copy.deepcopy(dictionary) # create a new instance of the class result = cls.__new__(cls) # set the attributes of the new instance, using the dictionary for key, value in dictionary.items(): setattr(result, key, value) # if there is an input cost hamiltonian, it is added to the result if cost_hamiltonian is not None: result.cost_hamiltonian = cost_hamiltonian # if the measurement_outcomes are strings, they are converted to complex numbers if not isinstance( result.optimized["measurement_outcomes"], dict ) and isinstance(result.optimized["measurement_outcomes"][0], str): for i in range(len(result.optimized["measurement_outcomes"])): result.optimized["measurement_outcomes"][i] = complex( result.optimized["measurement_outcomes"][i] ) for i in range(len(result.intermediate["measurement_outcomes"])): for j in range(len(result.intermediate["measurement_outcomes"][i])): result.intermediate["measurement_outcomes"][i][j] = complex( result.intermediate["measurement_outcomes"][i][j] ) # if the measurement_outcomes are complex numbers, the backend is set to QAOABaseBackendStatevector if not isinstance( result.optimized["measurement_outcomes"], dict ) and isinstance(result.optimized["measurement_outcomes"][0], complex): setattr(result, "_QAOAResult__type_backend", QAOABaseBackendStatevector) else: setattr(result, "_QAOAResult__type_backend", "") # return the object return result
[docs] @staticmethod def get_counts(measurement_outcomes): """ Converts probabilities to counts when the measurement outcomes are a numpy array, that is a state vector Parameters ---------- measurement_outcomes: `Union[np.array, dict]` The measurement outcome as returned by the Logger. It can either be a statevector or a count dictionary Returns ------- `dict` The count dictionary obtained either throught the statevector or the actual measurement counts. """ if isinstance(measurement_outcomes, type(np.array([]))): measurement_outcomes = qaoa_probabilities(measurement_outcomes) return measurement_outcomes
[docs] def plot_cost( self, figsize=(10, 8), label="Cost", linestyle="--", color="b", ax=None ): """ A simpler helper function to plot the cost associated to a QAOA workflow Parameters ---------- figsize: `tuple` The size of the figure to be plotted. Defaults to (10,8). label: `str` The label of the cost line, defaults to 'Cost'. linestyle: `str` The linestyle of the poloit. Defaults to '--'. color: `str` The color of the line. Defaults to 'b'. ax: 'matplotlib.axes._subplots.AxesSubplot' Axis on which to plot the graph. Defaults to None Returns ------- fig: 'matplotlib.figure.Figure' The figure object ax: 'matplotlib.axes._subplots.AxesSubplot' The axis object """ if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() ax.plot( range( 1, self.evals["number_of_evals"] - self.evals["jac_evals"] - self.evals["qfim_evals"] + 1, ), self.intermediate["cost"], label=label, linestyle=linestyle, color=color, ) ax.set_ylabel("Cost") ax.set_xlabel("Number of function evaluations") ax.legend() ax.set_title("Cost history") return fig, ax
[docs] def plot_probabilities( self, n_states_to_keep=None, figsize=(10, 8), label="Probability distribution", color="tab:blue", ax=None, ): """ Helper function to plot the probabilities corresponding to each basis states (with prob != 0) obtained from the optimized result Parameters ---------- n_states_to_keep: 'int If the user passes a value, the plot will compile with the given value of states. Else, an upper bound will be calculated depending on the total size of the measurement outcomes. figsize: `tuple` The size of the figure to be plotted. Defaults to (10,8). label: `str` The label of the cost line, defaults to 'Probability distribution'. color: `str` The color of the line. Defaults to 'tab:blue'. ax: 'matplotlib.axes._subplots.AxesSubplot' Axis on which to plot the graph. Defaults to None Returns ------- fig: 'matplotlib.figure.Figure' The figure object ax: 'matplotlib.axes._subplots.AxesSubplot' The axis object """ outcome = self.optimized["measurement_outcomes"] # converting to counts dictionary if outcome is statevector if isinstance(outcome, type(np.array([]))): outcome = self.get_counts(outcome) # setting norm to 1 since it might differ slightly for statevectors due to numerical preicision norm = np.float64(1) else: # needed to be able to divide the tuple by 'norm' norm = np.float64(sum(outcome.values())) # sorting dictionary. adding a callback function to sort by values instead of keys # setting reverse = True to be able to obtain the states with highest counts outcome_list = sorted(outcome.items(), key=lambda item: item[1], reverse=True) states, counts = zip(*outcome_list) # normalizing to obtain probabilities probs = counts / norm # total number of states / number of states with != 0 counts for shot simulators total = len(states) # number of states that fit without distortion in figure upper_bound = 40 # default fontsize font = "medium" if n_states_to_keep: if n_states_to_keep > total: raise ValueError( "n_states_to_keep must be smaller or equal than the total number" f"of states in measurement outcome: {total}" ) else: if n_states_to_keep > upper_bound: print("number of states_to_keep exceeds the recommended value") font = "small" # if states_to_keep is not given else: if total > upper_bound: n_states_to_keep = upper_bound else: n_states_to_keep = total # formatting labels labels = [ r"$\left|{}\right>$".format(state) for state in states[:n_states_to_keep] ] labels.append("rest") # represent the bar with the addition of all the remaining probabilities rest = sum(probs[n_states_to_keep:]) if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() colors = [color for _ in range(n_states_to_keep)] + ["xkcd:magenta"] ax.bar(labels, np.append(probs[:n_states_to_keep], rest), color=colors) ax.set_xlabel("Eigen-State") ax.set_ylabel("Probability") ax.set_title(label) ax.tick_params(axis="x", labelrotation=75, labelsize=font) ax.grid(True, axis="y", linestyle="--") print("states kept:", n_states_to_keep) return fig, ax
[docs] def plot_n_shots( self, figsize=(10, 8), param_to_plot=None, label=None, linestyle="--", color=None, ax=None, xlabel="Iterations", ylabel="Number of shots", title="Evolution of number of shots for gradient estimation", ): """ Helper function to plot the evolution of the number of shots used for each evaluation of the cost function when computing the gradient. It only works for shot adaptive optimizers: cans and icans. If cans was used, the number of shots will be the same for each parameter at each iteration. If icans was used, the number of shots could be different for each parameter at each iteration. Parameters ---------- figsize: `tuple` The size of the figure to be plotted. Defaults to (10,8). param_to_plot: `list[int]` or `int` The parameteres to plot. If None, all parameters will be plotted. Defaults to None. If a int is given, only the parameter with that index will be plotted. If a list of ints is given, the parameters with those indexes will be plotted. label: `list[str]` or `str` The label for each parameter. Defaults to Parameter {i}. If only one parameter is plot the label can be a string, otherwise it must be a list of strings. linestyle: `list[str]` or `str` The linestyle for each parameter. Defaults to '--' for all parameters. If it is a string all parameters will use it, if it a list of strings the linestyle of each parameter will depend on one string of the list. color: `list[str]` or `str` The color for each parameter. Defaults to None for all parameters (matplotlib will choose the colors). If only one parameter is plot the color can be a string, otherwise it must be a list of strings. ax: 'matplotlib.axes._subplots.AxesSubplot' Axis on which to plot the graph. If none is given, a new figure will be created. """ if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # creating a list of parameters to plot # if param_to_plot is not given, plot all the parameters if param_to_plot is None: param_to_plot = list(range(len(self.n_shots[0]))) # if param_to_plot is a single value, convert to list elif type(param_to_plot) == int: param_to_plot = [param_to_plot] # if param_to_plot is not a list, raise error if type(param_to_plot) != list: raise ValueError( "`param_to_plot` must be a list of integers or a single integer" ) else: for param in param_to_plot: assert param < len( self.n_shots[0] ), f"`param_to_plot` must be a list of integers between 0 and {len(self.n_shots[0]) - 1}" # if label is not given, create a list of labels for each parameter (only if there is more than 1 parameter) if len(self.n_shots[0]) > 1: label = ( [f"Parameter {i}" for i in param_to_plot] if label is None else label ) else: label = ["n. shots per parameter"] # if only one parameter is plotted, convert label and color to list if they are string if len(param_to_plot) == 1: if type(label) == str: label = [label] if type(color) == str: color = [color] # if param_top_plot is a list and label or color are not lists, raise error if (type(label) != list) or (type(color) != list and color is not None): raise TypeError("`label` and `color` must be list of str") # if label is a list, check that all the elements are strings for lab in label: assert type(lab) == str, "`label` must be a list of strings" # if color is a list, check that all the elements are strings if color is not None: for c in color: assert type(c) == str, "`color` must be a list of strings" # if label and color are lists, check if they have the same length as param_to_plot if len(label) != len(param_to_plot) or ( color is not None and len(color) != len(param_to_plot) ): raise ValueError( f"`param_to_plot`, `label` and `color` must have the same length, \ `param_to_plot` is a list of {len(param_to_plot)} elements" ) # linestyle must be a string or a list of strings, if it is a string, convert it to a list of strings if type(linestyle) != str and type(linestyle) != list: raise TypeError("`linestyle` must be str or list") elif type(linestyle) == str: linestyle = [linestyle for _ in range(len(param_to_plot))] elif len(linestyle) != len(param_to_plot): raise ValueError( f"`linestyle` must have the same length as param_to_plot \ (length of `param_to_plot` is {len(param_to_plot)}), or be a string" ) else: for ls in linestyle: assert type(ls) == str, "`linestyle` must be a list of strings" # plot the evolution of the number of shots for each parameter that is in param_to_plot transposed_n_shots = np.array(self.n_shots).T for i, values in enumerate([transposed_n_shots[j] for j in param_to_plot]): if color is None: ax.plot(values, label=label[i], linestyle=linestyle[i]) else: ax.plot(values, label=label[i], linestyle=linestyle[i], color=color[i]) ax.set_ylabel(ylabel) ax.set_xlabel(xlabel) ax.legend() ax.set_title(title) return fig, ax
[docs] def lowest_cost_bitstrings(self, n_bitstrings: int = 1) -> dict: """ Find the minimium energy from cost_hamilonian given a set of measurement outcoms Parameters ---------- n_bitstrings : int Number of the lowest energies bistrings to get Returns ------- best_results : dict Returns a list of bitstring with the lowest values of the cost Hamiltonian. """ if isinstance(self.optimized["measurement_outcomes"], dict): measurement_outcomes = self.optimized["measurement_outcomes"] solution_bitstring = list(measurement_outcomes.keys()) elif isinstance(self.optimized["measurement_outcomes"], np.ndarray): measurement_outcomes = self.get_counts( self.optimized["measurement_outcomes"] ) solution_bitstring = list(measurement_outcomes.keys()) else: raise TypeError( f"The measurement outcome {type(self.optimized['measurement_outcomes'])} is not valid." ) energies = [ bitstring_energy(self.cost_hamiltonian, bitstring) for bitstring in solution_bitstring ] args_sorted = np.argsort(energies) if n_bitstrings > len(energies): n_bitstrings = len(energies) total_shots = sum(measurement_outcomes.values()) best_results = { "solutions_bitstrings": [ solution_bitstring[args_sorted[ii]] for ii in range(n_bitstrings) ], "bitstrings_energies": [ energies[args_sorted[ii]] for ii in range(n_bitstrings) ], "probabilities": [ measurement_outcomes[solution_bitstring[args_sorted[ii]]] / total_shots for ii in range(n_bitstrings) ], } return best_results
[docs] def calculate_statistics(self, include_intermediate=False) -> dict: """ A function to calculate statistics of measurement outcomes associated with a QAOA workflow Parameters ---------- include_intermediate: `bool` Whether it is necessary to calculate statistics for intermediate results. Defaults to False. """ if ( len(self.intermediate["measurement_outcomes"]) == 0 and include_intermediate == True ): raise ValueError( "The underlying QAOA object does not seem to have any intermediate measurement result. Please, consider saving " "intermediate measurements during optimization by setting `optimization_progress=True` in your workflow." ) if isinstance(self.optimized["measurement_outcomes"], dict): optimized_measurement_outcomes = self.optimized["measurement_outcomes"] elif isinstance(self.optimized["measurement_outcomes"], np.ndarray): optimized_measurement_outcomes = self.get_counts( self.optimized["measurement_outcomes"] ) else: raise TypeError( f"The measurement outcome {type(self.optimized['measurement_outcomes'])} is not valid." ) if isinstance(self.intermediate["measurement_outcomes"], list): if len(self.intermediate["measurement_outcomes"]) > 0: if isinstance(self.intermediate["measurement_outcomes"][0], dict): intermediate_measurement_outcomes = self.intermediate[ "measurement_outcomes" ] elif isinstance( self.intermediate["measurement_outcomes"][0], np.ndarray ): intermediate_measurement_outcomes = [ self.get_counts(i) for i in self.intermediate["measurement_outcomes"] ] else: raise TypeError( f"The measurement outcome {type(self.intermediate['measurement_outcomes'][0])} is not valid." ) else: raise TypeError( f"The measurement outcome {type(self.intermediate['measurement_outcomes'])} is not valid." ) def sorted_mean_std_deviation(counts: dict): values = list(counts.values()) return { "sorted": dict( sorted(counts.items(), key=lambda x: x[1], reverse=True) ), "mean": np.mean(values), "std_deviation": np.std(values), } return { "intermediate": [ sorted_mean_std_deviation(i) for i in intermediate_measurement_outcomes ] if include_intermediate else [], "optimized": sorted_mean_std_deviation(optimized_measurement_outcomes), }