Source code for openqaoa_braket.backends.qaoa_braket_qpu

import os
from copy import deepcopy
from typing import Optional, List
import warnings

from braket.circuits import Circuit
from braket.circuits.gates import H
from braket.circuits.result_types import Probability
from braket.circuits.free_parameter import FreeParameter
from braket.jobs.metrics import log_metric

from .devices import DeviceAWS
from .gates_braket import BraketGateApplicator
from openqaoa.backends.basebackend import (
    QAOABaseBackendShotBased,
    QAOABaseBackendCloud,
    QAOABaseBackendParametric,
)
from openqaoa.qaoa_components import QAOADescriptor
from openqaoa.qaoa_components.variational_parameters.variational_baseparams import (
    QAOAVariationalBaseParams,
)
from openqaoa.utilities import permute_counts_dictionary


[docs]class QAOAAWSQPUBackend( QAOABaseBackendParametric, QAOABaseBackendCloud, QAOABaseBackendShotBased ): """ A QAOA simulator as well as for real QPU using Amazon Braket as backend Parameters ---------- device: `DeviceAWS` An object of the class ``DeviceAWS`` which contains the credentials for accessing the QPU via cloud and the name of the device. qaoa_descriptor: `QAOADescriptor` An object of the class ``QAOADescriptor`` which contains information on circuit construction and depth of the circuit. n_shots: `int` The number of shots to be taken for each circuit. prepend_state: `QuantumCircuit` The state prepended to the circuit. append_state: `QuantumCircuit` The state appended to the circuit. init_hadamard: `bool` Whether to apply a Hadamard gate to the beginning of the QAOA part of the circuit. cvar_alpha: `float` The value of alpha for the CVaR method. disable_qubit_rewiring: `bool` A boolean that determines whether qubit routing on the provider's end is used. This is False by default. Not all providers provide this feature. """ def __init__( self, qaoa_descriptor: QAOADescriptor, device: DeviceAWS, n_shots: int, prepend_state: Optional[Circuit], append_state: Optional[Circuit], init_hadamard: bool, cvar_alpha: float, disable_qubit_rewiring: bool = False, initial_qubit_mapping: Optional[List[int]] = None, ): QAOABaseBackendShotBased.__init__( self, qaoa_descriptor, n_shots, prepend_state, append_state, init_hadamard, cvar_alpha, ) QAOABaseBackendCloud.__init__(self, device) self.gate_applicator = BraketGateApplicator() self.qureg = list(range(self.n_qubits)) self.problem_reg = self.qureg[0 : self.problem_qubits] if self.initial_qubit_mapping is None: self.initial_qubit_mapping = ( initial_qubit_mapping if initial_qubit_mapping is not None else list(range(self.n_qubits)) ) else: if isinstance(initial_qubit_mapping, list): warnings.warn( "Ignoring the initial_qubit_mapping since the routing algorithm chose one" ) else: pass self.disable_qubit_rewiring = disable_qubit_rewiring if self.prepend_state: assert self.n_qubits >= len(prepend_state.qubits), ( "Cannot attach a bigger circuit" " to the QAOA routine" ) # if self.device.provider_connected and self.device.qpu_connected: # self.backend_qpu = self.device.backend_device if ( self.device.provider_connected is True and self.device.qpu_connected is False ): raise Exception( "Connection to AWS was made. Error connecting to the specified backend." ) elif ( self.device.provider_connected is True and self.device.qpu_connected is None ): raise Exception( "Connection to AWS was made. A device name was not specified." ) elif not (self.device.provider_connected and self.device.qpu_connected): raise Exception("Error connecting to AWS.") if self.device.n_qubits < self.n_qubits: raise Exception( "There are lesser qubits on the device than the number of qubits required for the circuit." ) self.parametric_circuit = self.parametric_qaoa_circuit
[docs] def qaoa_circuit(self, params: QAOAVariationalBaseParams) -> Circuit: """ The final QAOA circuit to be executed on the QPU. Parameters ---------- params: `QAOAVariationalBaseParams` Returns ------- qaoa_circuit: `Circuit` The final QAOA circuit constructed using the angles from variational params. """ parametric_circuit = deepcopy(self.parametric_circuit) if self.append_state: parametric_circuit += self.append_state parametric_circuit += Probability.probability() angles_list = self.obtain_angles_for_pauli_list(self.abstract_circuit, params) memory_map = dict( zip( [ each_free_param_obj.name for each_free_param_obj in self.braket_parameter_list ], angles_list, ) ) circuit_with_angles = parametric_circuit.make_bound_circuit(memory_map) return circuit_with_angles
@property def parametric_qaoa_circuit(self) -> Circuit: """ Creates a parametric QAOA circuit, given the qubit pairs, single qubits with biases, and a set of circuit angles. Note that this function does not actually run the circuit. """ parametric_circuit = Circuit() if self.prepend_state: parametric_circuit += self.prepend_state # Initial state is all |+> if self.init_hadamard: for each_qubit in self.problem_reg: parametric_circuit += H.h(each_qubit) self.braket_parameter_list = [] for each_gate in self.abstract_circuit: # if gate is of type mixer or cost gate, assign parameter to it if each_gate.gate_label.type.value in ["MIXER", "COST"]: angle_param = FreeParameter(each_gate.gate_label.__repr__()) self.braket_parameter_list.append(angle_param) each_gate.angle_value = angle_param decomposition = each_gate.decomposition("standard") # using the list above, construct the circuit for each_tuple in decomposition: gate = each_tuple[0](self.gate_applicator, *each_tuple[1]) gate.apply_gate(parametric_circuit) return parametric_circuit
[docs] def get_counts(self, params: QAOAVariationalBaseParams, n_shots=None) -> dict: """ Execute the circuit and obtain the counts Parameters ---------- params: QAOAVariationalBaseParams The QAOA parameters - an object of one of the parameter classes, containing variable parameters. n_shots: int The number of times to run the circuit. If None, n_shots is set to the default: self.n_shots. Returns ------- A dictionary with the bitstring as the key and the number of counts as its value. """ n_shots = self.n_shots if n_shots == None else n_shots circuit = self.qaoa_circuit(params) job_state = False no_of_job_retries = 0 max_job_retries = 5 while job_state == False: job = self.device.backend_device.run( circuit, (self.device.s3_bucket_name, self.device.folder_name), shots=n_shots, disable_qubit_rewiring=self.disable_qubit_rewiring, ) try: self.job_id = job.id job_result = job.result() # If there was an issue with the job sent, send again. if job.state() in ["FAILED", "CANCELLED"] or job_result == None: raise ValueError counts = job_result.measurement_counts except ValueError: print("The task has failed or was cancelled by AWS. Resending task.") no_of_job_retries += 1 except Exception as e: print(e, "\n") print( "An unknown error occurred while trying to retrieve task results. Resending task." ) no_of_job_retries += 1 else: job_state = True finally: if no_of_job_retries >= max_job_retries: raise ConnectionError( "An Error Occurred with the Task(s) sent to AWS." ) # # Expose counts if self.final_mapping is not None: counts = permute_counts_dictionary(counts, self.final_mapping) final_counts = {key[: self.problem_qubits]: 0 for key in counts.keys()} for key, value in counts.items(): final_counts[key[: self.problem_qubits]] += value self.measurement_outcomes = final_counts return final_counts
[docs] def log_with_backend(self, metric_name: str, value, iteration_number) -> None: """ If using AWS Jobs, these values will be logged. """ try: if os.environ["AMZN_BRAKET_JOB_NAME"] is not None: in_jobs = True except KeyError: in_jobs = False if in_jobs: log_metric( metric_name=metric_name, value=value, iteration_number=iteration_number, )
[docs] def circuit_to_qasm(self, params: QAOAVariationalBaseParams) -> str: """ A method to convert the entire QAOA `QuantumCircuit` object into a OpenQASM string """ raise NotImplementedError()
# qasm_string = self.qaoa_circuit(params).qasm(formatted=True) # return qasm_string
[docs] def reset_circuit(self): """ Reset self.circuit after performing a computation """ raise NotImplementedError()