Source code for f2xba.optimization.optimize

"""Implementation of Optimize Base Class.

Support Optimization using gurobipy and COBRApy interfaces.

Load SBML coded metabolic model via sbmlxdf
in case of gurobipy mode: Configure gp model:
    - variables
    - constraints
    - objective function

collect data structures required during optimization and results processing

Peter Schubert, HHU Duesseldorf, CCB, June 2024
"""

import re
import os
import time
import numpy as np
import pandas as pd
from collections import defaultdict
import sbmlxdf
import f2xba.prefixes as pf
from f2xba.utils.mapping_utils import get_srefs, parse_reaction_string, valid_sbml_sid

# gurobipy should not be a hard requirement, unless used in this context
try:
    import gurobipy as gp
except ImportError:
    gp = None
    pass

XML_SPECIES_NS = 'http://www.hhu.de/ccb/rba/species/ns'
XML_COMPARTMENT_NS = 'http://www.hhu.de/ccb/rba/compartment/ns'

status2text = {1: 'LOADED', 2: 'OPTIMAL', 3: 'INFEASIBLE', 4: 'INF_OR_UNBD', 5: 'UNBOUNDED',
               6: 'CUTOFF', 7: 'ITERATION_LIMIT', 8: 'NODE_LIMIT', 9: 'TIME_LIMIT',
               10: 'SOLUTION_LIMIT', 11: 'INTERRUPTED', 12: 'NUMERIC', 13: 'SUBOPTIMAL',
               14: 'INPROGRESS', 15: 'USER_OBJ_LIMIT', 16: 'WORK_LIMIT', 17: 'MEM_LIMIT'}

def extract_net_fluxes(fluxes):
    """Extract net fluxes reactions fluxes for optimization solution.

    In extended models, reactions get split per direction and isoenzyme.
    Here, the fluxes are collected on the level of the original net reaction.

    :param fluxes: reaction fluxes from optimization solution
    :type fluxes: pandas.Series or None
    :return: net fluxes, by combining fluxes of related reactions.
    :rtype: pandas.Series or None
    """
    if fluxes is not None:
        net_fluxes = defaultdict(float)
        for rid, flux in fluxes.items():
            rid = str(rid)
            if re.match('.*_REV$', rid):
                net_fluxes[re.sub('_REV$', '', rid)] -= flux
            else:
                net_fluxes[rid] += flux
        return pd.Series(net_fluxes, name='fluxes')
    else:
        return fluxes


[docs] class Solution: """Structure holding information from optimization result. Aligned to COBRApy cobra.Solution. Example: Load and optimize a GECKO model using the gurobipy interface. .. code-block:: python from f2xba import EcmOptimization eo = EcmOptimization('iML1515_GECKO.xml') eo.medium = {rid: 1000.0 for rid in lb_medium} solution = eo.optimize() solution.objective_value Code for __repr__(), _repr_html_() copied from cobra.Solution. """ def __init__(self, status, objective_value=np.nan, fluxes=None, reduced_costs=None, shadow_prices=None): self.status = status """Status string from optimization.""" self.objective_value = objective_value """Objective value (float).""" self.fluxes = fluxes """Values of optimization variables (pandas Series).""" self.reduced_costs = reduced_costs """Reduced cost of optimization variables (pandas Series).""" self.shadow_prices = shadow_prices """Shadow prices of constraints (pandas Series).""" def __repr__(self): if self.status != 'optimal': return f'<Solution {self.status} at {id(self):#x}>' return f'<Solution {self.objective_value:.3f} at {id(self):#x}>' def _repr_html_(self): if self.status == 'optimal': with pd.option_context('display.max_rows', 10): df = pd.DataFrame({'fluxes': self.fluxes, 'reduced_costs': self.reduced_costs}) html = ('<strong><em>Optimal</em> solution with objective ' f'value {self.objective_value:.3f}</strong><br>{df._repr_html_()}') else: html = f'<strong><em>{self.status}</em> solution</strong>' return html
[docs] class Optimize: """Support optimization of SBML encoded extended models generated by f2xba. `Optimize` is parent to `FbaOptimization`, `EcmOptimization` and `RbaOptimization` classes. `Optimize` provides common functionality, including support for thermodynamics constraint model variants. Both COBRApy and gurobipy interfaces are supported. Use of gurobipy interface requires GUROBI and gurobipy being installed on your system. The gurobipy interface has significant speed advantages, especially for larger amd more complex models. Example 1: Load and optimize GECKO model using the COBRApy interface. .. code-block:: python import cobra from f2xba import EcmOptimization ecm = cobra.io.read_sbml_model('iML1515_GECKO.xml') eo = EcmOptimization('iML1515_GECKO.xml', ecm) ecm.medium = {rid: 1000.0 for rid in medium} solution = ecm.optimize() Example 2: Load and optimize GECKO model using the gurobipy interface. .. code-block:: python from f2xba import EcmOptimization eo = EcmOptimization('iML1515_GECKO.xml') eo.medium = {rid: 1000.0 for rid in medium} solution = eo.optimize() """ def __init__(self, model_type, fname, cobra_model=None): """Instantiate the Optimize base class This instantiation is invoked by child classes. The SBML-coded extended model is loaded from file, and information is extracted. A gurobipy model (gpm) is created, if gurobipy is available and no COBRApy model is provided. For TD constraint models, thermodynamics related variables and constraints will be reconfigured. :param str model_type: base type of the model ('FBA', 'ECM' or 'RBA') :param str fname: filename of the SBML coded extended model :param cobra_model: reference to cobra model (default: None) :type cobra_model: :class:`cobra.Model` """ self.model_type = model_type if not os.path.exists(fname): print(f'Error: {fname} not found!') raise FileNotFoundError self.model_name = os.path.basename(fname).split('.')[0] # load SBML coded metabolic model into sbmlxdf for data extraction sbml_model = sbmlxdf.Model(fname) print(f'SBML model loaded by sbmlxdf: {fname} ({time.ctime(os.path.getmtime(fname))})') self.m_dict = sbml_model.to_df() """Information extracted from the SBML encoded model.""" self.gpm = None """Instance of gurobipy.Model, in case gurobipy interface is used.""" self.orig_gpm = None self.gp_neg_var = {} sbml_parameters = self.m_dict['parameters']['value'].to_dict() self.avg_enz_saturation = sbml_parameters.get('avg_enz_sat') """Average enzyme saturation level configured in ECM and RBA models.""" self.protein_per_gdw = sbml_parameters.get('gP_per_gDW') """Total protein per gram cell dry weight (gP/gDW) configured in ECM models.""" self.modeled_protein_mf = sbml_parameters.get('gmodeledP_per_gP') """Protein mass fraction explicitly modelled in the ECM.""" self.importer_km = sbml_parameters.get('default_importer_km_value') """Default Michaelis Menten constant KM (mmol/l) configured in the RBA model.""" if cobra_model: self.is_gpm = False self.model = cobra_model self._cp_configure_td_model_constraints() else: self.is_gpm = True self.var_id2gpm = {} self.constr_id2gpm = {} self.gpm = self._gp_create_model() self.report_model_size() self.locus2uid = self._get_locus2uid() """Map gene identifier to UniProt protein identifier.""" self.mid2name = {re.sub(f'^{pf.M_}', '', sid): row['name'] for sid, row in self.m_dict['species'].iterrows()} self.id2groups = self._get_id2groups() """Map reaction identifier to group names configured in the SBML model.""" self.rdata_model = self._get_reaction_data() """Information related to reactions.""" self.uptake_rids = [rid for rid, rdata in self.rdata_model.items() if rdata['is_exchange'] is True] """Exchange reactions used for exchange of metabolites with the environment.""" self.ex_rid2sid = self._get_ex_rid2sid() self.sidx2ex_ridx = {re.sub(f'^{pf.M_}', '', sid): re.sub(f'^{pf.R_}', '', rid) for rid, sid in self.ex_rid2sid.items()} # rdata and net_rdata used in results processing (COBRApy ids, without 'R_', 'M_' self.rdata = {} """Information related to reactions (prefixes `R_` and `M_` removed).""" for rid, record in self.rdata_model.items(): cp_rdata = record.copy() cp_rdata['net_rid'] = re.sub(f'^{pf.R_}', '', record['net_rid']) cp_rdata['reaction_str'] = re.sub(r'\b' + f'{pf.M_}', '', record['reaction_str']) self.rdata[re.sub(f'^{pf.R_}', '', rid)] = cp_rdata self.net_rdata = self._extract_net_reaction_data(self.rdata) """Information related to net reactions (iso-reactions and fwd/rev directions combined).""" self.rids_catalyzed = self.get_rids_catalyzed() """Map reaction identifier to related genes identifiers.""" # self.uid2locus = {uid: locus for locus, uid in self.locus2uid.items()} def _get_id2groups(self): """Extract Groups component information from SBML model to map ids to group names. Optional SBML Groups package may contain assignemnt of reaction ids to specific groups. :return: mapping of model identifies to group names as per GROUPS component :rtype: dict (key: model id / str, val: list if group names / str) """ id2groups = defaultdict(list) if 'groups' in self.m_dict: for _, row in self.m_dict['groups'].iterrows(): members_list = row.get('members') if type(members_list) is str: members_list = row.get('members', '') for item in members_list.split(';'): if '=' in item: reference_id = item.split('=')[1].strip() id2groups[reference_id].append(row['name']) return dict(id2groups) def _get_locus2uid(self): """Extract mapping for gene locus to protein id from SBML model. For EC models (e.g. GECKO) extract mapping - from protein concentration variables 'V_PC_xxx', which carry the protein name in the id - from fbcGeneProdAssoc of such variable extract the (single) gene product id - from fbcGeneProducts component extract the related 'label' - gene locus. for RBA models extract mapping from - from macromolecular coupling constraints 'MM_prot_xxx, which carry the gene locus in the name - from miriamAnnotation extract protein id, if available (not available for e.g. RNA or dummy protein) :return: mapping of gene locus to protein identifier :rtype: dict (key: gene locus ('label' of fbaGeneProducts), val: protein id) """ locus2uid = {} # support for EC based models, e.g. GECKO for varid, row in self.m_dict['reactions'].iterrows(): if re.match(f'{pf.V_PC_}', varid): uid = re.sub(f'{pf.V_PC_}', '', varid) gpr = row['fbcGeneProdAssoc'] if type(gpr) is str: gpid = gpr.split('=')[1] label = valid_sbml_sid(self.m_dict['fbcGeneProducts'].at[gpid, 'label']) locus2uid[label] = uid # support for RBA based models for constr_id, row in self.m_dict['species'].iterrows(): if re.match(f'{pf.MM_}', constr_id): uids = sbmlxdf.misc.get_miriam_refs(row['miriamAnnotation'], 'uniprot') if len(uids) == 1: label = re.sub(f'{pf.MM_}', '', constr_id) locus2uid[label] = uids[0] return locus2uid def _get_ex_rid2sid(self): """Get mapping of cobra exchange reaction ids to RBA uptake metabolites From XML annotation of compartments identify the compartent for media uptake Note: RBA allows other compartments than external compartment to be used in Michaelis Menten saturation terms for medium uptake, e.g. periplasm (however, this may block reactions in periplasm) replace compartment postfix of exchanged metabolited by medium cid of RBA model. Create mapping table from exchange reaction to medium """ medium_cid = 'e' for cid, row in self.m_dict['compartments'].iterrows(): xml_annots = row.get('xmlAnnotation') xml_attrs = sbmlxdf.misc.extract_xml_attrs(xml_annots, ns=XML_COMPARTMENT_NS) if 'medium' in xml_attrs and xml_attrs['medium'].lower == 'true': medium_cid = cid break ex_rid2sid = {} for uptake_rid in self.uptake_rids: row = self.m_dict['reactions'].loc[uptake_rid] # ridx = re.sub(f'^{pf.R_}', '', uptake_rid) sref_str = row['reactants'].split(';')[0] sid = sbmlxdf.extract_params(sref_str)['species'] mid = sid.rsplit('_', 1)[0] ex_rid2sid[uptake_rid] = f'{mid}_{medium_cid}' return ex_rid2sid
[docs] def report_model_size(self): """Print information about the model size. .. code-block:: python eo.report_model_size() """ n_vars_drg = 0 n_vars_ec = 0 n_vars_pmc = 0 if self.is_gpm: self.gpm.update() for var in self.gpm.getVars(): if re.match(pf.V_DRG_, var.VarName): n_vars_drg += 1 elif re.match(pf.V_EC_, var.VarName): n_vars_ec += 1 elif re.match(pf.V_PMC_, var.VarName): n_vars_pmc += 1 model_type = 'MILP' if self.gpm.ismip else 'LP' print(f'{model_type} Model of {self.gpm.modelname}') print(f'{self.gpm.numvars} variables, {self.gpm.numconstrs} constraints, ' f'{self.gpm.numnzs} non-zero matrix coefficients') else: for rxn in self.model.reactions: if re.match(pf.V_DRG_, rxn.id): n_vars_drg += 1 elif re.match(pf.V_EC_, rxn.id): n_vars_ec += 1 elif re.match(pf.V_PMC_, rxn.id): n_vars_pmc += 1 build_str = [] if n_vars_ec > 0: build_str.append(f'{n_vars_ec} enzymes') if n_vars_pmc > 0: build_str.append(f'{n_vars_pmc} process machines') if n_vars_drg > 0: build_str.append(f'{n_vars_drg} TD reaction constraints') if len(build_str) > 0: print(', '.join(build_str))
# COBRAPY MODEL RELATED def _cp_configure_td_model_constraints(self): """Re-configure thermodynamics related variables and constraints in COBRApy model. """ is_td = False for var in self.model.variables: for vprefix in [pf.V_FU_, pf.V_RU_]: if re.match(vprefix, var.name) and 'reverse' not in var.name: is_td = True var.type = 'binary' for constr in self.model.constraints: for cprefix in [pf.C_FFC_, pf.C_FRC_, pf.C_GFC_, pf.C_GRC_, pf.C_SU_]: if re.match(cprefix, constr.name): constr.lb = None if is_td: print(f'Thermodynamic use variables (V_FU_xxx and V_RU_xxx) as binary') print(f'Thermodynamic constraints (C_F[FR]C_xxx, C_G[FR]C_xxx, C_SU_xxx) ≤ 0') # GUROBIPY MODEL CONSTRUCTION RELATED def _gp_create_model_neg_vars(self): """Create and configure a gurobipy model with data from metabolic model. selected variables get converted to non-negative variables (Not used for now) - however impact on RBA, ECM, pfba, fba, gp_moma and gp_room yet to be checked - create empty gp.model: - add variables (using reaction data from metabolic model) - construct and implement linear constraints based on reaction string - construct and implement linear objective - configure TD binary variables and TD related inequality constraints """ # create empty gurobipy model gpm = gp.Model(self.m_dict['modelAttrs'].id) # set model parameters gpm.setParam('OutputFlag', 0) # COBRApy default Feasibility gpm.params.FeasibilityTol = 1e-7 # 1e-6 default gpm.params.OptimalityTol = 1e-7 # 1e-6 default gpm.params.IntFeasTol = 1e-7 # 1e-5 default # add variables to the gurobi model, with suppport of binary variables td_binary_var_prefixes = f'({pf.V_FU_}|{pf.V_RU_})' td_continuous_var_prefixes = f'({pf.V_DRG_}|{pf.V_DRG0_}|{pf.R_})' #td_continuous_var_prefixes = f'xxxxx' self.var_id2gpm = {} for var_id, row in self.m_dict['reactions'].iterrows(): # configure TD forward/reverse use variables as binary if re.match(td_binary_var_prefixes, var_id): self.var_id2gpm[var_id] = gpm.addVar(lb=0.0, ub=1.0, vtype='B', name=var_id) # make selected continuous variables non-negative by splitting elif re.match(td_continuous_var_prefixes, var_id): neg_var_id = f'{var_id}_reverse' self.gp_neg_var[var_id] = neg_var_id self.var_id2gpm[var_id] = gpm.addVar(lb=max(0.0, row['fbcLb']), ub=max(0.0, row['fbcUb']), vtype='C', name=var_id) self.var_id2gpm[neg_var_id] = gpm.addVar(lb=max(0.0, -row['fbcUb']), ub=max(0.0, -row['fbcLb']), vtype='C', name=neg_var_id) # balance continuous variables of the model may assume negative values else: self.var_id2gpm[var_id] = gpm.addVar(lb=row['fbcLb'], ub=row['fbcUb'], vtype='C', name=var_id) # collect constraints with mapping to gp variables constrs = {} for var_id, row in self.m_dict['reactions'].iterrows(): reactants = get_srefs(row['reactants']) products = get_srefs(row['products']) srefs = {constr_id: -coeff for constr_id, coeff in reactants.items()} | products for constr_id, coeff in srefs.items(): if constr_id not in constrs: constrs[constr_id] = self.var_id2gpm[var_id] * coeff else: constrs[constr_id] += self.var_id2gpm[var_id] * coeff # implement splitting of transformed Gibbs energy of reaction variables if re.match(td_continuous_var_prefixes, var_id): constrs[constr_id] += self.var_id2gpm[f'{var_id}_reverse'] * -coeff # adding linear constraints to the model, with support of Thermodynamic constraints # Some TD modeling constraints are defined as LHS <= 0.0 td_constr_prefixes = f'({pf.C_FFC_}|{pf.C_FRC_}|{pf.C_GFC_}|{pf.C_GRC_}|{pf.C_SU_})' self.constr_id2gpm = {} for constr_id, constr in constrs.items(): if re.match(td_constr_prefixes, constr_id): self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '<', 0.0, constr_id) else: self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '=', 0.0, constr_id) # configure optimization objective df_fbc_objs = self.m_dict['fbcObjectives'] active_odata = df_fbc_objs[df_fbc_objs['active'] == bool(True)].iloc[0] sense = gp.GRB.MAXIMIZE if 'max' in active_odata['type'].lower() else gp.GRB.MINIMIZE vars_gpm = [] coeffs = [] for sref_str in sbmlxdf.record_generator(active_odata['fluxObjectives']): params = sbmlxdf.extract_params(sref_str) vars_gpm.append(self.var_id2gpm[params['reac']]) coeffs.append(float(params['coef'])) gpm.setObjective(gp.LinExpr(coeffs, vars_gpm), sense) # update gpm with configuration data gpm.update() return gpm def _gp_create_model(self): """Create and configure a gurobipy model with data from metabolic model. - create empty gp.model: - add variables (using reaction data from metabolic model) - construct and implement linear constraints based on reaction string - construct and implement linear objective - construct and implement linear objective - configure TD binary variables and TD related inequality constraints """ # create empty gurobipy model gpm = gp.Model(self.m_dict['modelAttrs'].id) # set model parameters gpm.setParam('OutputFlag', 0) # COBRApy default feasibility gpm.params.FeasibilityTol = 1e-7 # 1e-6 default gpm.params.OptimalityTol = 1e-7 # 1e-6 default gpm.params.IntFeasTol = 1e-7 # 1e-5 default # add variables to the gurobi model, with suppport of binary variables self.var_id2gpm = {} for var_id, row in self.m_dict['reactions'].iterrows(): # configure TD forward/reverse use variables as binary if re.match(pf.V_FU_, var_id) or re.match(pf.V_RU_, var_id): self.var_id2gpm[var_id] = gpm.addVar(lb=0.0, ub=1.0, vtype='B', name=var_id) else: self.var_id2gpm[var_id] = gpm.addVar(lb=row['fbcLb'], ub=row['fbcUb'], vtype='C', name=var_id) # collect constraints with mapping to gp variables constrs = {} for var_id, row in self.m_dict['reactions'].iterrows(): reactants = get_srefs(row['reactants']) products = get_srefs(row['products']) srefs = {constr_id: -coeff for constr_id, coeff in reactants.items()} | products for constr_id, coeff in srefs.items(): if constr_id not in constrs: constrs[constr_id] = self.var_id2gpm[var_id] * coeff else: constrs[constr_id] += self.var_id2gpm[var_id] * coeff # adding linear constraints to the model, with support of Thermodynamic constraints # Some TD modeling constraints are defined as LHS <= 0.0 td_constr_prefixes = f'({pf.C_FFC_}|{pf.C_FRC_}|{pf.C_GFC_}|{pf.C_GRC_}|{pf.C_SU_})' self.constr_id2gpm = {} for constr_id, constr in constrs.items(): if re.match(td_constr_prefixes, constr_id): self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '<', 0.0, constr_id) else: self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '=', 0.0, constr_id) # configure optimization objective df_fbc_objs = self.m_dict['fbcObjectives'] active_odata = df_fbc_objs[df_fbc_objs['active'] == bool(True)].iloc[0] sense = gp.GRB.MAXIMIZE if 'max' in active_odata['type'].lower() else gp.GRB.MINIMIZE vars_gpm = [] coeffs = [] for sref_str in sbmlxdf.record_generator(active_odata['fluxObjectives']): params = sbmlxdf.extract_params(sref_str) vars_gpm.append(self.var_id2gpm[params['reac']]) coeffs.append(float(params['coef'])) gpm.setObjective(gp.LinExpr(coeffs, vars_gpm), sense) # update gpm with configuration data gpm.update() return gpm def _get_biomass_rid(self): """Extract biomass reaction id from model fbcObjectives. :return: biomass reaction id (without `R_`) :rtype: str """ df_fbc_objs = self.m_dict['fbcObjectives'] active_odata = df_fbc_objs[df_fbc_objs['active'] == bool(True)].iloc[0] sref_str = active_odata['fluxObjectives'].split(';')[0] return re.sub(pf.R_, '', sbmlxdf.extract_params(sref_str)['reac']) # RETRIEVING DATA REQUIRED FOR RESULTS ANALYSIS @staticmethod def _get_reaction_str(rdata): """Generate the reactions string from a reaction object. e.g. for PFK_iso1: 'M_2pg_c => M_adp_c + M_fdp_c + M_h_c' e.g. for PFK_iso1_REV: 'M_adp_c + M_fdp_c + M_h_c => M_2pg_c' Exclude constraints (starting with `C_`) :param dict(str, dict) rdata: reaction data :return: reactions sting :rtype: str """ lparts = [] rparts = [] for sid, stoic in get_srefs(rdata['reactants']).items(): if re.match(pf.C_, sid) is None: # drop stoichiometric coefficients that are 1.0 stoic_str = f'{round(stoic, 4)} ' if stoic != 1.0 else '' lparts.append(f'{stoic_str}{sid}') for sid, stoic in get_srefs(rdata['products']).items(): if re.match(pf.C_, sid) is None: stoic_str = f'{round(stoic, 4)} ' if stoic != 1.0 else '' rparts.append(f'{stoic_str}{sid}') dirxn = ' -> ' if rdata['reversible'] else ' => ' return ' + '.join(lparts) + dirxn + ' + '.join(rparts) def _expand_pseudo_metabolite(self, rid, reaction_str): """Expand pseudo metabolite in reaction string of iso reaction. Pseudo metabolites `pmet_` are used, when `_arm` reactions are introduced, e.g. in GECKO models. '_arm' reactions combine fluxes of iso-reactions. Iso reactions are connected via pseudo metabolites to the arm reaction E.g. reversible iML1515 reaction R_FBA in GECKO model FBA_arm, FBA_iso1, FBA_iso2 FBA_arm_REV, FBA_iso1_REV, FBA_iso2_REV :param str rid: iso reaction idenitifer :param str reaction_str: reaction string :return: reactions string with metabolite :return: str """ arm_rid = re.sub(r'_iso\d+', '_arm', rid) arm_rdata = self.m_dict['reactions'].loc[arm_rid] parts = re.split(' [-=]> ', self._get_reaction_str(arm_rdata)) expand_pmet = parts[1] if re.search(r'\w*pmet_\w+', parts[0]) else parts[0] return re.sub(r'\w*pmet_\w+', expand_pmet, reaction_str) def _get_reaction_compartments(self, reaction_str): """From reaction string extract participating compartments e.g. reaction_str = 'M_2dmmq8_c + M_h2_c + 2.0 M_h_c => M_2dmmql8_c + 2.0 M_h_p' return: 'c_p' :param str reaction_str: species srefs string as per sbmlxdf reactants/products fields in reactions :return: sorted compartment ids joined by '-' :rtype: str """ cids = set() srefs = parse_reaction_string(reaction_str) for idx, r_side in enumerate(['reactants', 'products']): for sref_str in sbmlxdf.record_generator(srefs[r_side]): params = sbmlxdf.extract_params(sref_str) sid = params['species'] cids.add(self.m_dict['species'].loc[sid].compartment) return '-'.join(sorted(cids)) def _get_reaction_data(self): """Extract reaction related data from reactions (reaction string, gpr) Supports ARM reactions (e.g. GECKO model) by expanding pseudo metabolites. Data can be used in augmenting flux related optimization results :return: reaction related data, indexed by reaction id :rtype: dict """ gpid2label = self.m_dict['fbcGeneProducts']['label'].to_dict() rdatas = {} for rid, row in self.m_dict['reactions'].iterrows(): if re.match(pf.V_, rid) is None and re.search('_arm', rid) is None: drxn = 'rev' if re.search('_REV$', rid) else 'fwd' fwd_rid = re.sub('_REV$', '', rid) net_rid = re.sub(r'_iso\d+', '', fwd_rid) groups = '; '.join(self.id2groups.get(rid, '')) reaction_str = self._get_reaction_str(row) if re.search(r'pmet_\w+', reaction_str) is not None: reaction_str = self._expand_pseudo_metabolite(rid, reaction_str) # translate gene product ids to gene labels parts = [] if type(row['fbcGeneProdAssoc']) is str: assoc = re.sub('assoc=', '', row['fbcGeneProdAssoc']) for item in re.findall(r'\b\w+\b', assoc): if item in gpid2label: parts.append(gpid2label[item]) elif item in {'and', 'or'}: parts.append(item) gpr = ' '.join(parts) # mpmf coupling used in GECKO models for fitting kcats to proteomics (not required for RBA) mpmf_coupling = {} if gpr != '': loci = [item.strip() for item in gpr.split(' and ')] reactants = get_srefs(row['reactants']) for locus in loci: locus = valid_sbml_sid(locus) # note: FBA/TFA models have not data in locus2uid if locus in self.locus2uid: ecm_coupling_constr_id = f'{pf.C_prot_}{self.locus2uid[locus]}' if ecm_coupling_constr_id in reactants: mpmf_coupling[locus] = reactants[ecm_coupling_constr_id]/self.protein_per_gdw exchange = False if type(row['products']) is str else True rp_cids = self._get_reaction_compartments(reaction_str) r_type = 'transport' if '-' in rp_cids else 'metabolic' rdatas[rid] = {'net_rid': net_rid, 'drxn': drxn, 'reaction_str': reaction_str, 'gpr': gpr, 'mpmf_coupling': mpmf_coupling, 'is_exchange': exchange, 'r_type': r_type, 'compartment': rp_cids, 'groups': groups} return rdatas
[docs] def get_tx_metab_genes(self): """Get gene identifiers grouped by transport and metabolic reactions. Genes required in both transport and metabolic enzymes are designated transport related genes. Example: Extract sets of transport and metabolic related genes. .. code-block:: python tx_genes, metab_genes = eo.get_tx_metab_genes() :return: gene identifiers grouped by transport and metabolic enzymes :rtype: (set(str), set(str)) """ tx_genes = set() metab_genes = set() for data in self.rdata.values(): if len(data['gpr']) > 0: gpr_del_or = re.sub(' or ', ',', data['gpr']) gpr_del_or_and = re.sub(' and ', ',', gpr_del_or) genes = [item.strip() for item in gpr_del_or_and.split(',')] if data['r_type'] == 'transport': for gene in genes: tx_genes.add(gene) elif data['r_type'] == 'metabolic': for gene in genes: metab_genes.add(gene) metab_genes = metab_genes.difference(tx_genes) return tx_genes, metab_genes
@staticmethod def _extract_net_reaction_data(rdata): """Extract net reaction data, i.e. unsplit reactions Net reaction is a reaction with reaction string in forward direction Reversibility arrow is adjusted if there exists a corresponding reaction in reverse Gene Product Associations are combined using 'or' for iso-reactions :param dict(dict) rdata: reaction information :return: reaction data with isoreactions combined :rtype: dict(dict) """ # determine 'net' reaction data (i.e. unsplit the reaction) rev_net_rids = {record['net_rid'] for rid, record in rdata.items() if record['drxn'] == 'rev'} net_rdata = {} for rid, record in rdata.items(): if record['drxn'] == 'fwd': net_rid = record['net_rid'] if net_rid not in net_rdata: reaction_str = record['reaction_str'] if net_rid in rev_net_rids: reaction_str = re.sub('=>', '->', reaction_str) net_rdata[net_rid] = {'reaction_str': reaction_str, 'gpr': record['gpr'], 'groups': record['groups']} else: net_rdata[net_rid]['gpr'] += ' or ' + record['gpr'] return net_rdata def get_rids_catalyzed(self): """Get reactions that are catalyzed with enzyme composition (gene labels). Data can be used to assess impact of gene deletions Data can be used to identify reactions catalyzed by a specific gene Note: does not support 'or' in alternative components of enzyme complex like (b1234 or b2346) and b2222 Workaround: rearrange gpr: (b1234 and b2222) or (b2346 and b2222) :meta private: :return: enzymes and their compositions of catalyzed reactions :rtype: dict (key: rid/str, val: list of sets of gene labels) """ gpid2label = self.m_dict['fbcGeneProducts']['label'].to_dict() rids_catalyzed = {} for rid, rdata in self.m_dict['reactions'].iterrows(): if re.match(pf.V_, rid) is None: enzymes = [] if type(rdata['fbcGeneProdAssoc']) is str: assoc = re.sub('assoc=', '', rdata['fbcGeneProdAssoc']) for enz_str in assoc.split('or'): labels = set() for gpid in re.findall(r'\b\w+\b', enz_str): if gpid in gpid2label: labels.add(gpid2label[gpid]) enzymes.append(labels) if len(enzymes) > 0: rids_catalyzed[rid] = enzymes return rids_catalyzed # SUPPORT FUNCTIONS FOR GENE DELETION STUDY
[docs] def get_rids_blocked_by(self, genes): """Identify reactions that get blocked when deleting selected genes. Example: Identify reactions that are blocked by the strain background. .. code-block:: python strain_background = {'b0063', 'b0062', 'b0061', 'b3904', 'b3903', 'b3902', 'b0344'} blocked_rids = eo.get_rids_blocked_by(strain_background) :param genes: gene or genes :type genes: str or list(str) :return: reaction ids affected by gene deletions :return: list(str) """ if type(genes) is str: genes = {genes} elif type(genes) is list: genes = set(genes) assert type(genes) is set, 'genes must be of type str of a set/list of str' blocked_rids = [] for rid, enzymes in self.rids_catalyzed.items(): impacted = True for enzyme in enzymes: if len(genes.intersection(enzyme)) == 0: impacted = False break if impacted: blocked_rids.append(rid) return blocked_rids
# OPTIMIZATION RELATED
[docs] def get_variable_bounds(self, var_ids): """Retrieve upper/lower bounds of selected model variables. Bounds can be queried for a single variable or a list of variables. Example: Retrieve variable bounds for single variable, list of variables. .. code-block:: python bounds_1 = eo.get_variable_bounds('V_PC_total') bounds_2 = eo.get_variable_bounds(['POR5_iso1', 'POR5_iso1_REV']) :param var_ids: variable identifier or a list of variable identifiers :type var_ids: str or list(str) :return: variable ids with respective lower und upper bounds :rtype: dict(str, tuple) """ if type(var_ids) is str: var_ids = [var_ids] variable_bounds = {} if self.is_gpm: self.gpm.update() # support splitting of variables in two non-negative variables. for var_idx in var_ids: var_id = f'{pf.R_}{var_idx}' if f'{pf.R_}{var_idx}' in self.var_id2gpm else var_idx assert var_id in self.var_id2gpm, f'{var_id} not found' var = self.var_id2gpm[var_id] if var_id in self.gp_neg_var: neg_var = self.var_id2gpm[self.gp_neg_var[var_id]] ub = var.ub if neg_var.lb == 0.0 else -neg_var.lb lb = -neg_var.ub if var.lb == 0.0 else var.lb else: ub = var.ub lb = var.lb variable_bounds[var_idx] = (lb, ub) else: for var_id in var_ids: assert var_id in self.model.reactions, f'{var_id} not found' variable_bounds[var_id] = self.model.reactions.get_by_id(var_id).bounds return variable_bounds
[docs] def set_variable_bounds(self, variable_bounds): """Set upper/lower bounds of selected model variables. Bound values set to 'None' will not be modified. Example: Block iso-reaction POR5_iso1 in forward and reverse directions. Only upper bounds need to be changed to 0.0. .. code-block:: python orig_bounds = eo.set_variable_bounds({'POR5_iso1': (None, 0.0), 'POR5_iso1_REV': (None, 0.0)}) :param dict(str, tuple) variable_bounds: variable identifiers with tuple of lower and upper bounds :return: original bounds :rtype: dict(str, tuple) """ orig_bounds = self.get_variable_bounds(list(variable_bounds)) if self.is_gpm: self.gpm.update() # support splitting of variables in two non-negative variables. for var_idx, (lb, ub) in variable_bounds.items(): var_id = f'{pf.R_}{var_idx}' if f'{pf.R_}{var_idx}' in self.var_id2gpm else var_idx assert var_id in self.var_id2gpm, f'{var_id} not found' var = self.var_id2gpm[var_id] if var_id in self.gp_neg_var: neg_var = self.var_id2gpm[self.gp_neg_var[var_id]] if ub is not None: if ub >= 0.0: var.ub = ub if neg_var.lb > 0.0: neg_var.lb = 0.0 else: neg_var.lb = -ub if var.ub > 0.0: var.ub = 0.0 if lb is not None: if lb >= 0.0: var.lb = lb if neg_var.ub > 0.0: neg_var.ub = 0.0 else: neg_var.ub = -lb if var.lb > 0.0: var.lb = 0.0 else: if lb is not None: var.lb = lb if ub is not None: var.ub = ub else: for var_id, (lb, ub) in variable_bounds.items(): assert var_id in self.model.reactions, f'{var_id} not found' rxn = self.model.reactions.get_by_id(var_id) # orig_bounds[var_id] = rxn.bounds if (lb is not None) and (ub is not None): rxn.bounds = (lb, ub) elif lb is not None: rxn.lower_bound = lb elif ub is not None: rxn.upper_bound = ub return orig_bounds
[docs] def get_objective(self): """Retrieve model optimization objective. Example: Query optimization objective of the model. .. code-block:: python eo.get_objective() :return: variable coefficients and optimization direction :rtype: dict(str, float), str """ if self.is_gpm: lin_expr = self.gpm.getObjective() objective = {} for idx in range(lin_expr.size()): var_id = lin_expr.getVar(idx).VarName objective[re.sub(pf.R_, '', var_id)] = lin_expr.getCoeff(idx) direction = {gp.GRB.MINIMIZE: 'min', gp.GRB.MAXIMIZE: 'max'}[self.gpm.ModelSense] else: objective = {} sympy_expr = self.model.solver.objective.expression for var, coeff in sympy_expr.as_coefficients_dict().items(): varname = var.name if '_reverse_' not in varname: objective[varname] = coeff direction = self.model.objective.direction return objective, direction
[docs] def set_objective(self, objective, direction='max'): """Set optimization objective. Example: Modify the optimization objective of a model. .. code-block:: python eo.set_objective({'BIOMASS_Ec_iML1515_WT_75p37M': 1.0}, 'max') :param dict(str, float) objective: variable identifiers with coefficients :param str direction: direction of optimization: 'min' or 'max' (default: 'max') """ if self.is_gpm: sense = gp.GRB.MAXIMIZE if 'max' in direction.lower() else gp.GRB.MINIMIZE variables = [] coeffs = [] for var_id, coeff in objective.items(): if var_id not in self.var_id2gpm: var_id = f'{pf.R_}{var_id}' assert var_id in self.var_id2gpm variables.append(self.var_id2gpm[var_id]) coeffs.append(coeff) self.gpm.setObjective(gp.LinExpr(coeffs, variables), sense) self.gpm.update() else: self.model.objective = {self.model.reactions.get_by_id(var_id): coeff for var_id, coeff in objective.items()} self.model.objective_direction = direction
[docs] def set_tfa_metab_concentrations(self, metab_concs): """Configure metabolite concentration ranges in mol/l for TD constraint models. Metabolomics data for a given condition can be used to further constrain models that contain thermodynamics constraints. Such models have optimization variables for species concentrations. Minimum and maximum concentrations for selected species can be configured using this method. Concentration bounds will not be set for 'None' values. Example: Set minimum and maximum concentration values (mol/l) for selected metabolites. .. code-block:: python molar_concs = {'atp_c': (0.00178, 0.00712), 'adp_c': (5.8e-05, 0.00023), 'datp_c': (9.05e-05, 0.00036)} eo.set_tfa_metab_concentrations(molar_concs) :param dict(str, tuple) metab_concs: min and max concentrations for species in mol/l :return: original concentrations (min, max) :rtype: dict (str, tuple(float)) """ variable_bounds = {} for sid, (lb, ub) in metab_concs.items(): sidx = re.sub('^M_', '', sid) var_id = f'{pf.V_LC_}{sidx}' log_lb = np.log(lb) if lb else None log_ub = np.log(ub) if ub else None variable_bounds[var_id] = (log_lb, log_ub) orig_log_bounds = self.set_variable_bounds(variable_bounds) orig_bounds = {} for var_id, (log_lb, log_ub) in orig_log_bounds.items(): sidx = re.sub(pf.V_LC_, '', var_id) lb = np.exp(log_lb) if log_lb else None ub = np.exp(log_ub) if log_ub else None orig_bounds[sidx] = (lb, ub) return orig_bounds
[docs] def update_relaxation(self, fluxes, eps=0.5): """TFA slack model relaxation for a specified flux distribution. In case optimizations of a model with added thermodynamics constraints leads to infeasable solutions, model relaxation is required. Model relaxation is performed on a TFA model with slack variables configured for ∆rG'˚ (standard transformed Gibbs energy of reaction) variables. Use TfaModel.export_slack_model() to generate such a model. This method is used to determine adjusted ∆rG'˚ variable bounds that make the TD model feasible and to configure the updated bounds to support optimization of further conditions. Example 1: Perform TFA model relaxation using COBRApy interface. With the additional constraint of achieving at least 95% of the growth rate of the model without TD constraints. .. code-block:: python import cobra from f2xba import FbaOptimization tfa_slack_model = cobra.io.read_sbml_model('iML1515_TFA_slack.xml') fo = FbaOptimization('iML1515_TFA_slack.xml', tfa_slack_model) tfa_slack_model.reactions.get_by_id('BIOMASS_Ec_iML1515_core_75p37M').lower_bound = 0.95 * wt_gr tfa_slack_model.medium = medium solution = tfa_slack_model.optimize() drg0_relaxations = fo.update_relaxation(solution.fluxes) Example 2: Perform TFA model relaxation using cobrapy interface. .. code-block:: python from f2xba import FbaOptimization fo = FbaOptimization('iML1515_TFA_slack.xml') orig_bounds = fo.set_variable_bounds({'BIOMASS_Ec_iML1515_core_75p37M':(0.95 * wt_gr, None)}) fo.medium = medium solution = fo.optimize() fo.set_variable_bounds(orig_bounds) drg0_relaxations = fo.update_relaxation(solution.fluxes) :param dict fluxes: fluxes of a TFA slack model optimization solution :param float eps: additional margin in kJ/mol (default: 0.5) :return: ∆rG'˚ variables with updated bounds. :rtype: dict(str, dict(str,float) """ # collect DRG0 variables that need relaxation slack_vars = {} for var_id, val in fluxes.items(): if re.match(pf.V_NS_, var_id) and val > 1e-7: slack_vars[re.sub(f'^{pf.V_NS_}', pf.V_DRG0_, var_id)] = -val elif re.match(pf.V_PS_, var_id) and val > 1e-7: slack_vars[re.sub(f'^{pf.V_PS_}', pf.V_DRG0_, var_id)] = val # relax DRG0 variables by adjusting respective bound drg0_relaxations = {} update_bounds = {} for drg0_id, (lb, ub) in self.get_variable_bounds(list(slack_vars)).items(): slack = slack_vars[drg0_id] if slack < 0.0: new_lb = lb + slack - eps update_bounds[drg0_id] = (new_lb, None) drg0_relaxations[drg0_id] = {'fbc_lower_bound': new_lb} else: new_ub = ub + slack + eps update_bounds[drg0_id] = (None, new_ub) drg0_relaxations[drg0_id] = {'fbc_upper_bound': new_ub} self.set_variable_bounds(update_bounds) return drg0_relaxations
[docs] def modify_stoic(self, constr_id, var_id, new_val): """Modify a stoichiometric coefficient using gurobipy interface. Stoichiometric coefficients in a COBRApy model can be modified using the COBRApy method cobra.core.reaction.add_metabolites(). This method supports the modification of a stoichiometric coefficient when using the gurobipy interface. As model contexts are not supported, the coefficient should be reset to the original value after optimization. Example: Remove the cofactor requirement of hemeA in anaerobic conditions. .. code-block:: python if 'EX_o2_e' not in medium: orig_stoic = eo.modify_stoic('M_hemeA_c', 'R_CofactorSynth', 0.0) eo.optimize() if 'EX_o2_e' not in medium: eo.modify_stoic('M_hemeA_c', 'R_CofactorSynth', orig_stoic) :param str constr_id: constraint identifier, e.g. species id :param str var_id: variable identifier, e.g. reaction id :param float new_val: stoichiometric coefficient (negative for consumation) :return: original stoichiometric coefficient :rtype: float """ if self.is_gpm: constr = self.gpm.getConstrByName(constr_id) var = self.gpm.getVarByName(var_id) if constr is None: print(f'Constraint with identifier "{constr_id}" not found!') return None if var is None: print(f'Variable with identifier "{var_id}" not found!') return None old_val = self.gpm.getCoeff(constr, var) self.gpm.chgCoeff(constr, var, new_val) if var_id in self.gp_neg_var: neg_var = self.gpm.getVarByName(self.gp_neg_var[var_id]) self.gpm.chgCoeff(constr, neg_var, -new_val) return old_val else: print('Method only implemented for gurobipy interface, for COBRApy, use rxn.add_metabolites()') return None
def set_medium(self, medium): """Configure medium in gurobipy model from dictionary with uptake fluxes. It is recommended to rather assign the medium as in COBRApy (use medium property) e.g. medium = {'EX_glc__D_e': 10.0, 'EX_o2_e': 20.0, 'EX_ca2_e': 1000.0, ...} For gurobipy implementation, support splitting of variables in two non-negative variables. :meta private: :param dict(str, float) medium: nutrients in medium, i.e. exchange reactions with max uptake rate """ if self.is_gpm: update_bounds = {} for ex_rid in self.uptake_rids: ex_ridx = re.sub('^R_', '', ex_rid) if ex_ridx in medium: update_bounds[ex_rid] = (-medium[ex_ridx], None) else: update_bounds[ex_rid] = (0.0, None) self.set_variable_bounds(update_bounds) else: self.model.medium = medium
[docs] def optimize(self, alt_model=None): """Optimize gurobipy model and return COBRApy compliant solution. Example: Optimize the model. .. code-block:: python eo.optimize() Thermodynamics constrained models are supported inline with pyTFA package. :param alt_model: gurobipy model (default: None) :type alt_model: :class:`gurobipy.Model` :return: optimization solution :rtype: :class:`Solution` """ model = self.gpm if alt_model is None else alt_model model.optimize() solution = self._gp_get_solution(model) return solution
def _gp_combine_neg_vars(self, data_raw): """Combine negative variable values from gurobipy optimization. :param dict(str, float) data_raw: optimization variables with values. :return: """ data = {} neg_var_ids = set(self.gp_neg_var.values()) for var_id, val in data_raw.items(): var_idx = re.sub(f'^{pf.R_}', '', var_id) if var_id not in neg_var_ids: if var_id in self.gp_neg_var: data[var_idx] = data_raw[var_id] - data_raw[self.gp_neg_var[var_id]] else: data[var_idx] = data_raw[var_id] return data def _gp_get_solution(self, alt_model=None): """Retrieve optimization solution for gp model. Solution as per COBRApy solution Reaction and Metabolites ids without `R_`, `M_` prefix - status - objective value - fluxes - reduced costs - shadow prices :param alt_model: (optional) alternative model (e.g. pFBA model) :type alt_model: gurobipy.Model, if provided :return: optimization solution :rtype: class:`Solution` """ model = self.gpm if alt_model is None else alt_model results_dict = {'status': status2text[model.status].lower(), 'tmp': pd.Series()} if hasattr(model, 'objval'): results_dict['objective_value'] = model.objval if hasattr(model.getVars()[0], 'x'): data_raw = {var.VarName: var.X for var in model.getVars()} results_dict['fluxes'] = pd.Series(self._gp_combine_neg_vars(data_raw), name='fluxes') if hasattr(model.getVars()[0], 'rc'): data_raw = {var.VarName: var.RC for var in model.getVars()} results_dict['reduced_costs'] = pd.Series(self._gp_combine_neg_vars(data_raw), name='reduced_costs') if hasattr(model.getConstrs()[0], 'pi'): data = {re.sub(f'^{pf.M_}', '', constr.ConstrName): constr.Pi for constr in model.getConstrs()} results_dict['shadow_prices'] = pd.Series(data, name='shadow_prices') del results_dict['tmp'] return Solution(**results_dict)
[docs] def pfba(self, fraction_of_optimum=1.0): """Perform parsimonious FBA optimization (pFBA) on a FBA model using the gurobipy interface. Example: Execute a pFBA optimization using the gurobipy interface. .. code-block:: python solution = eo.pfba() :param float fraction_of_optimum: factor to scale FBA objective (default: 1.0) :return: optimization solution :return: :class:`Solution` """ if self.is_gpm is None: print('Method implemented for gurobipy interface only.') return None # determine fba growth rate fba_solution = self.optimize() fba_gr = fba_solution.objective_value pfba_solution = fba_solution.objective_value if np.isfinite(fba_gr): # create an pFBA model based on FBA model and make reactions irreversible pfba_gpm = self.gpm.copy() for var in pfba_gpm.getVars(): if var.lb < 0: fwd_col = pfba_gpm.getCol(var) rev_coeffs = [-fwd_col.getCoeff(idx) for idx in range(fwd_col.size())] constrs = [fwd_col.getConstr(idx) for idx in range(fwd_col.size())] rev_col = gp.Column(coeffs=rev_coeffs, constrs=constrs) pfba_gpm.addVar(lb=0.0, ub=-var.lb, vtype=var.VType, name=f'{var.VarName}_REV', column=rev_col) var.lb = 0.0 # create temporary constraint for FBA objective fba_objective = pfba_gpm.getObjective() pfba_gpm.addLConstr(fba_objective, '=', fba_gr * fraction_of_optimum, 'FBA objective') # New Objective: minimize sum of all non-negative reactions pfba_gpm.update() vars_gpm = [var for var in pfba_gpm.getVars()] pfba_objective = gp.LinExpr(np.ones(len(vars_gpm)), vars_gpm) pfba_gpm.setObjective(pfba_objective, gp.GRB.MINIMIZE) # optimize and collect results (without error handling) pfba_gpm.optimize() pfba_solution = self._gp_get_solution(pfba_gpm) pfba_solution.fluxes = extract_net_fluxes(pfba_solution.fluxes) pfba_gpm.close() return pfba_solution
[docs] def fva(self, rids=None, fraction_of_optimum=1.0): """Perform a Flux Variability Analysis (FVA) on a FBA model using the gurobipy interface. A list of reaction ids can be provided to limit the scope of the FVA analysis. Example: Execute FVA analyis for a set of reactions using the gurobipy interface. .. code-block:: python eo.fva(['CS', 'ME1', 'POR5_iso1', 'POR5_iso1_REV']) :param list(str) rids: reaction identifiers (without leading `R_`) (default: None) :param float fraction_of_optimum: scaling of wild type objective value (default: 1.0) :return: table with minimum and maximum reaction fluxes :rtype: pandas.DataFrame """ if self.is_gpm is None: print('Method implemented for gurobipy interface only.') return None ridx2rid = {re.sub('R_', '', rid): rid for rid in self.m_dict['reactions'].index if re.match('^R_', rid)} selected_rids = list(ridx2rid.values()) if rids is None else [ridx2rid[ridx] for ridx in rids] # determine wildtype growth rate wt_gr = self.optimize().objective_value results = {} if np.isfinite(wt_gr): # create a temporary FVA model based on FBA model fva_gpm = self.gpm.copy() # add wt objective as constraint wt_objective = fva_gpm.getObjective() fva_gpm.addLConstr(wt_objective, '=', wt_gr * fraction_of_optimum, 'wild type objective') fva_gpm.update() for rid in selected_rids: var = fva_gpm.getVarByName(rid) fva_gpm.setObjective(var, gp.GRB.MINIMIZE) min_flux = self.optimize(fva_gpm).objective_value fva_gpm.setObjective(var, gp.GRB.MAXIMIZE) max_flux = self.optimize(fva_gpm).objective_value results[re.sub('^R_', '', rid)] = [min_flux, max_flux] fva_gpm.close() return pd.DataFrame(results.values(), list(results.keys()), columns=['minimum', 'maximum'])
def solve(self, **rba_solve_params): """RBA solver, overwritten by RbaOptimization :meta private: """ return pd.DataFrame() def get_active_genes(self, fluxes): """Determine active genes based on a reaction flux distribution. All genes required for isoenzymes of a reactions carrying flux are collected. :meta private: :param dict(str, float) fluxes: reaction fluxes in mmol/gDWh :return: set of genes potentially required for given reaction flux destribution :rtype: set """ # map reaction ids without 'R_' prefix, as per COBRApy to reaction ids used in the model ridx2rid = {re.sub(f'^{pf.R_}', '', rid): rid for rid in self.rids_catalyzed} gene_list = set() for ridx, flux in fluxes.items(): if abs(flux) > 0.0: if ridx in ridx2rid: for enz_comp in self.rids_catalyzed[ridx2rid[ridx]]: for gene in enz_comp: gene_list.add(gene) print(f'{len(gene_list)} genes potentially active given flux distribution.') return gene_list
[docs] def gene_knock_outs(self, genes): """Block reactions that are catalyzed by specified gene/genes. Example: Block reactions that depend on the specified gene and perform a model optimization. .. code-block:: python orig_bounds = eo.gene_knock_outs('b0025') solution = eo.optimize() eo.set_variable_bounds(orig_bounds) print(f'mutant gr: {solution.objective_value:.3f} h-1') :param genes: gene identifiers :type genes: str or list(str) :return: original reaction bounds (lb, ub) :rtype: dict(str, tuple) """ if type(genes) is str: genes = [genes] if self.is_gpm: update_bounds = {var_id: (0.0, 0.0) for var_id in self.get_rids_blocked_by(genes)} orig_rid_bounds = self.set_variable_bounds(update_bounds) else: orig_rid_bounds = {} for gene in genes: assert gene in self.model.genes, f'{gene} not found' for react in self.model.genes.get_by_id(gene).reactions: orig_rid_bounds[react.id] = (react.lower_bound, react.upper_bound) for gene in genes: self.model.genes.get_by_id(gene).knock_out() return orig_rid_bounds
[docs] def moma(self, wt_fluxes, linear=False): """Perform a MOMA based optimization using the gurobipy interface. Supported for FBA and ECM models. Ref: Segre et al. 2002, Analysis of optimality in natural and perturbed metabolic networks. MOMA (`linear` = False): minimize: ∑ (v - wt)^2 s.t.: S * v = 0; v_lb ≤ v ≤ v_ub v: disturbed flux distribution; wt: wild type flux distribution. This requires adding transformed variables `vt` with adjusted bounds, adding new constraints `vt = v - wt` and minimizing the objective function `∑ vt^2`. Linear MOMA (`linear` = True): minimize: ∑ abs(v - wt) s.t.: S * v = 0; v_lb ≤ v ≤ v_ub introducing non-negative transformed flux variables and related constraints and minimizing `∑ (vtp + vtn)` `vtp`: [0 ... (v_ub - wt)] with constraints `vtp ≥ v - wt` `vtn`: [0 ... (wt - v_lb)] with constraints `vtn ≥ wt - v` Example: Perform MOMA analysis for a disturbed network, caused by gene deletion. .. code-block:: python fo.medium = medium wt_solution = fo.pfba() orig_bounds = fo.gene_knock_outs('b0003') moma_solution = fo.moma(wt_solution.fluxes) fo.set_variable_bounds(orig_bounds) print(f'{moma_solution.fluxes['BIOMASS_Ec_iML1515_core_75p37M']:.3f} h-1') :param dict(str, float) wt_fluxes: wild type flux distribution, e.g. pFBA fluxes :param bool linear: using quadratic (Euclidean distance) or linear formulation (Manhattan) (default: False) :return: MOMA determined solution :rtype: :class:`Solution` """ if self.is_gpm is None: print('Method implemented for gurobipy interface only.') return pd.DataFrame() self.gpm.update() moma_gpm = self.gpm.copy() tflux_vars = [] for ridx, wt_flux in wt_fluxes.items(): flux_var = moma_gpm.getVarByName(f'R_{ridx}') if flux_var is None: flux_var = moma_gpm.getVarByName(ridx) if not linear: tflux_lb = flux_var.LB - wt_flux tflux_ub = flux_var.UB - wt_flux tflux_var = moma_gpm.addVar(lb=tflux_lb, ub=tflux_ub, vtype=gp.GRB.CONTINUOUS, name=f'V_MOMA_transformed_flux_{ridx}') tflux_vars.append(tflux_var) moma_gpm.addLConstr(flux_var - tflux_var, sense=gp.GRB.EQUAL, rhs=wt_flux) else: tfluxp_ub = flux_var.UB - wt_flux # if wild type flux already at the upper boundary we do not require fluxp variable/constraint if tfluxp_ub > 0: tfluxp_var = moma_gpm.addVar(lb=0, ub=tfluxp_ub, vtype=gp.GRB.CONTINUOUS, name=f'V_MOMA_transformed_fluxp_{ridx}') tflux_vars.append(tfluxp_var) moma_gpm.addLConstr(flux_var - tfluxp_var, sense=gp.GRB.LESS_EQUAL, rhs=wt_flux) tfluxn_ub = wt_flux - flux_var.LB if tfluxn_ub > 0: tfluxn_var = moma_gpm.addVar(lb=0, ub=tfluxn_ub, vtype=gp.GRB.CONTINUOUS, name=f'V_MOMA_transformed_fluxn_{ridx}') tflux_vars.append(tfluxn_var) moma_gpm.addLConstr(flux_var + tfluxn_var, sense=gp.GRB.GREATER_EQUAL, rhs=wt_flux) if not linear: moma_gpm.setObjective(gp.quicksum([v * v for v in tflux_vars]), gp.GRB.MINIMIZE) else: moma_gpm.setObjective(gp.quicksum(tflux_vars), gp.GRB.MINIMIZE) moma_gpm.optimize() moma_solution = self._gp_get_solution(moma_gpm) moma_gpm.close() return moma_solution
[docs] def room(self, wt_fluxes, linear=False, delta=0.03, epsilon=1e-3, time_limit=30.0): """Perform a ROOM based optimization using the gurobipy interface. Ref: Shlomi et al., 2005, Regulatory on off minimization of metabolic flux changes after genetic perturbations ROOM (`linear` = False): minimize: ∑ y^i s.t. S * v = 0; v_lb ≤ v ≤ v_ub v - y * (v_ub - wt_u) <= wt_u; wt_u = wt + delta * abs(wt) + \epsilon v - y * (v_lb - wt_l) <= wt_l; wt_l = wt - delta * abs(wt) - \epsilon v: disturbed flux distribution; wt: wild type flux distribution. y_i ∈ {0,1} Linear ROOM (`linear` = True): 0 <= y_i <= 1 Example: Perform ROOM analysis for a disturbed network, caused by gene deletion. .. code-block:: python fo.medium = medium wt_solution = fo.pfba() orig_bounds = fo.gene_knock_outs('b0003') room_solution = fo.room(wt_solution.fluxes) fo.set_variable_bounds(orig_bounds) print(f'{room_solution.fluxes['BIOMASS_Ec_iML1515_core_75p37M']:.3f} h-1') :param dict(str, float) wt_fluxes: wild type flux distribution, e.g. pFBA fluxes :param bool linear: using MILP (False) or relaxed LP (True) formulation (default: False) :param float delta: relative tolerance range (default: 0.03) :param float epsilon: absolute tolerance range (default: 1e-3) :param float time_limit: time limit in seconds (default: 30.0) :return: ROOM determined solution :rtype: :class:`Solution` """ if self.is_gpm is None: print('Method implemented for gurobipy interface only.') return None self.gpm.update() room_gpm = self.gpm.copy() if not linear: room_gpm.params.timelimit = time_limit flux_ctrl_vars = [] for ridx, wt_flux in wt_fluxes.items(): flux_var = room_gpm.getVarByName(f'R_{ridx}') if flux_var is None: flux_var = room_gpm.getVarByName(ridx) flux_lb = flux_var.LB flux_ub = flux_var.UB if not linear: wl_flux = wt_flux - abs(wt_flux) * delta - epsilon wu_flux = wt_flux + abs(wt_flux) * delta + epsilon else: wl_flux = wt_flux wu_flux = wt_flux if wl_flux > flux_lb: if not linear: flux_lower_ctrl_var = room_gpm.addVar(lb=0, ub=1, vtype=gp.GRB.BINARY, name=f'V_ROOM_wl_control_{ridx}') else: flux_lower_ctrl_var = room_gpm.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS, name=f'V_ROOM_wl_control_{ridx}') flux_ctrl_vars.append(flux_lower_ctrl_var) room_gpm.addLConstr(flux_var - flux_lower_ctrl_var * (flux_lb - wl_flux), sense=gp.GRB.GREATER_EQUAL, rhs=wl_flux) if wu_flux < flux_ub: if not linear: flux_upper_ctrl_var = room_gpm.addVar(lb=0, ub=1, vtype=gp.GRB.BINARY, name=f'V_ROOM_wu_control_{ridx}') else: flux_upper_ctrl_var = room_gpm.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS, name=f'V_ROOM_wu_control_{ridx}') flux_ctrl_vars.append(flux_upper_ctrl_var) room_gpm.addLConstr(flux_var - flux_upper_ctrl_var * (flux_ub - wu_flux), sense=gp.GRB.LESS_EQUAL, rhs=wu_flux) room_gpm.setObjective(gp.quicksum(flux_ctrl_vars), gp.GRB.MINIMIZE) room_gpm.optimize() room_solution = self._gp_get_solution(room_gpm) room_gpm.close() return room_solution
def __del__(self): if self.is_gpm and self.gpm is not None: self.gpm.close()