"""Implementation of Optimize Base Class.
Support Optimization using gurobipy and COBRApy interfaces.
Load SBML coded metabolic model via sbmlxdf
in case of gurobipy mode: Configure gp model:
- variables
- constraints
- objective function
collect data structures required during optimization and results processing
Peter Schubert, HHU Duesseldorf, CCB, June 2024
"""
import re
import os
import time
import numpy as np
import pandas as pd
from collections import defaultdict
import sbmlxdf
import f2xba.prefixes as pf
from f2xba.utils.mapping_utils import get_srefs, parse_reaction_string, valid_sbml_sid
# gurobipy should not be a hard requirement, unless used in this context
try:
import gurobipy as gp
except ImportError:
gp = None
pass
XML_SPECIES_NS = 'http://www.hhu.de/ccb/rba/species/ns'
XML_COMPARTMENT_NS = 'http://www.hhu.de/ccb/rba/compartment/ns'
status2text = {1: 'LOADED', 2: 'OPTIMAL', 3: 'INFEASIBLE', 4: 'INF_OR_UNBD', 5: 'UNBOUNDED',
6: 'CUTOFF', 7: 'ITERATION_LIMIT', 8: 'NODE_LIMIT', 9: 'TIME_LIMIT',
10: 'SOLUTION_LIMIT', 11: 'INTERRUPTED', 12: 'NUMERIC', 13: 'SUBOPTIMAL',
14: 'INPROGRESS', 15: 'USER_OBJ_LIMIT', 16: 'WORK_LIMIT', 17: 'MEM_LIMIT'}
def extract_net_fluxes(fluxes):
"""Extract net fluxes reactions fluxes for optimization solution.
In extended models, reactions get split per direction and isoenzyme.
Here, the fluxes are collected on the level of the original net reaction.
:param fluxes: reaction fluxes from optimization solution
:type fluxes: pandas.Series or None
:return: net fluxes, by combining fluxes of related reactions.
:rtype: pandas.Series or None
"""
if fluxes is not None:
net_fluxes = defaultdict(float)
for rid, flux in fluxes.items():
rid = str(rid)
if re.match('.*_REV$', rid):
net_fluxes[re.sub('_REV$', '', rid)] -= flux
else:
net_fluxes[rid] += flux
return pd.Series(net_fluxes, name='fluxes')
else:
return fluxes
[docs]
class Solution:
"""Structure holding information from optimization result.
Aligned to COBRApy cobra.Solution.
Example: Load and optimize a GECKO model using the gurobipy interface.
.. code-block:: python
from f2xba import EcmOptimization
eo = EcmOptimization('iML1515_GECKO.xml')
eo.medium = {rid: 1000.0 for rid in lb_medium}
solution = eo.optimize()
solution.objective_value
Code for __repr__(), _repr_html_() copied from cobra.Solution.
"""
def __init__(self, status, objective_value=np.nan, fluxes=None, reduced_costs=None, shadow_prices=None):
self.status = status
"""Status string from optimization."""
self.objective_value = objective_value
"""Objective value (float)."""
self.fluxes = fluxes
"""Values of optimization variables (pandas Series)."""
self.reduced_costs = reduced_costs
"""Reduced cost of optimization variables (pandas Series)."""
self.shadow_prices = shadow_prices
"""Shadow prices of constraints (pandas Series)."""
def __repr__(self):
if self.status != 'optimal':
return f'<Solution {self.status} at {id(self):#x}>'
return f'<Solution {self.objective_value:.3f} at {id(self):#x}>'
def _repr_html_(self):
if self.status == 'optimal':
with pd.option_context('display.max_rows', 10):
df = pd.DataFrame({'fluxes': self.fluxes, 'reduced_costs': self.reduced_costs})
html = ('<strong><em>Optimal</em> solution with objective '
f'value {self.objective_value:.3f}</strong><br>{df._repr_html_()}')
else:
html = f'<strong><em>{self.status}</em> solution</strong>'
return html
[docs]
class Optimize:
"""Support optimization of SBML encoded extended models generated by f2xba.
`Optimize` is parent to `FbaOptimization`, `EcmOptimization`
and `RbaOptimization` classes. `Optimize` provides common functionality, including
support for thermodynamics constraint model variants.
Both COBRApy and gurobipy interfaces are supported. Use of gurobipy interface
requires GUROBI and gurobipy being installed on your system. The gurobipy interface
has significant speed advantages, especially for larger amd more complex models.
Example 1: Load and optimize GECKO model using the COBRApy interface.
.. code-block:: python
import cobra
from f2xba import EcmOptimization
ecm = cobra.io.read_sbml_model('iML1515_GECKO.xml')
eo = EcmOptimization('iML1515_GECKO.xml', ecm)
ecm.medium = {rid: 1000.0 for rid in medium}
solution = ecm.optimize()
Example 2: Load and optimize GECKO model using the gurobipy interface.
.. code-block:: python
from f2xba import EcmOptimization
eo = EcmOptimization('iML1515_GECKO.xml')
eo.medium = {rid: 1000.0 for rid in medium}
solution = eo.optimize()
"""
def __init__(self, model_type, fname, cobra_model=None):
"""Instantiate the Optimize base class
This instantiation is invoked by child classes.
The SBML-coded extended model is loaded from file, and information is extracted.
A gurobipy model (gpm) is created, if gurobipy is available and no COBRApy model is provided.
For TD constraint models, thermodynamics related variables and constraints will be reconfigured.
:param str model_type: base type of the model ('FBA', 'ECM' or 'RBA')
:param str fname: filename of the SBML coded extended model
:param cobra_model: reference to cobra model (default: None)
:type cobra_model: :class:`cobra.Model`
"""
self.model_type = model_type
if not os.path.exists(fname):
print(f'Error: {fname} not found!')
raise FileNotFoundError
self.model_name = os.path.basename(fname).split('.')[0]
# load SBML coded metabolic model into sbmlxdf for data extraction
sbml_model = sbmlxdf.Model(fname)
print(f'SBML model loaded by sbmlxdf: {fname} ({time.ctime(os.path.getmtime(fname))})')
self.m_dict = sbml_model.to_df()
"""Information extracted from the SBML encoded model."""
self.gpm = None
"""Instance of gurobipy.Model, in case gurobipy interface is used."""
self.orig_gpm = None
self.gp_neg_var = {}
sbml_parameters = self.m_dict['parameters']['value'].to_dict()
self.avg_enz_saturation = sbml_parameters.get('avg_enz_sat')
"""Average enzyme saturation level configured in ECM and RBA models."""
self.protein_per_gdw = sbml_parameters.get('gP_per_gDW')
"""Total protein per gram cell dry weight (gP/gDW) configured in ECM models."""
self.modeled_protein_mf = sbml_parameters.get('gmodeledP_per_gP')
"""Protein mass fraction explicitly modelled in the ECM."""
self.importer_km = sbml_parameters.get('default_importer_km_value')
"""Default Michaelis Menten constant KM (mmol/l) configured in the RBA model."""
if cobra_model:
self.is_gpm = False
self.model = cobra_model
self._cp_configure_td_model_constraints()
else:
self.is_gpm = True
self.var_id2gpm = {}
self.constr_id2gpm = {}
self.gpm = self._gp_create_model()
self.report_model_size()
self.locus2uid = self._get_locus2uid()
"""Map gene identifier to UniProt protein identifier."""
self.mid2name = {re.sub(f'^{pf.M_}', '', sid): row['name']
for sid, row in self.m_dict['species'].iterrows()}
self.id2groups = self._get_id2groups()
"""Map reaction identifier to group names configured in the SBML model."""
self.rdata_model = self._get_reaction_data()
"""Information related to reactions."""
self.uptake_rids = [rid for rid, rdata in self.rdata_model.items() if rdata['is_exchange'] is True]
"""Exchange reactions used for exchange of metabolites with the environment."""
self.ex_rid2sid = self._get_ex_rid2sid()
self.sidx2ex_ridx = {re.sub(f'^{pf.M_}', '', sid): re.sub(f'^{pf.R_}', '', rid)
for rid, sid in self.ex_rid2sid.items()}
# rdata and net_rdata used in results processing (COBRApy ids, without 'R_', 'M_'
self.rdata = {}
"""Information related to reactions (prefixes `R_` and `M_` removed)."""
for rid, record in self.rdata_model.items():
cp_rdata = record.copy()
cp_rdata['net_rid'] = re.sub(f'^{pf.R_}', '', record['net_rid'])
cp_rdata['reaction_str'] = re.sub(r'\b' + f'{pf.M_}', '', record['reaction_str'])
self.rdata[re.sub(f'^{pf.R_}', '', rid)] = cp_rdata
self.net_rdata = self._extract_net_reaction_data(self.rdata)
"""Information related to net reactions (iso-reactions and fwd/rev directions combined)."""
self.rids_catalyzed = self.get_rids_catalyzed()
"""Map reaction identifier to related genes identifiers."""
# self.uid2locus = {uid: locus for locus, uid in self.locus2uid.items()}
def _get_id2groups(self):
"""Extract Groups component information from SBML model to map ids to group names.
Optional SBML Groups package may contain assignemnt of reaction ids to specific groups.
:return: mapping of model identifies to group names as per GROUPS component
:rtype: dict (key: model id / str, val: list if group names / str)
"""
id2groups = defaultdict(list)
if 'groups' in self.m_dict:
for _, row in self.m_dict['groups'].iterrows():
members_list = row.get('members')
if type(members_list) is str:
members_list = row.get('members', '')
for item in members_list.split(';'):
if '=' in item:
reference_id = item.split('=')[1].strip()
id2groups[reference_id].append(row['name'])
return dict(id2groups)
def _get_locus2uid(self):
"""Extract mapping for gene locus to protein id from SBML model.
For EC models (e.g. GECKO) extract mapping
- from protein concentration variables 'V_PC_xxx', which carry the protein name in the id
- from fbcGeneProdAssoc of such variable extract the (single) gene product id
- from fbcGeneProducts component extract the related 'label' - gene locus.
for RBA models extract mapping from
- from macromolecular coupling constraints 'MM_prot_xxx, which carry the gene locus in the name
- from miriamAnnotation extract protein id, if available (not available for e.g. RNA or dummy protein)
:return: mapping of gene locus to protein identifier
:rtype: dict (key: gene locus ('label' of fbaGeneProducts), val: protein id)
"""
locus2uid = {}
# support for EC based models, e.g. GECKO
for varid, row in self.m_dict['reactions'].iterrows():
if re.match(f'{pf.V_PC_}', varid):
uid = re.sub(f'{pf.V_PC_}', '', varid)
gpr = row['fbcGeneProdAssoc']
if type(gpr) is str:
gpid = gpr.split('=')[1]
label = valid_sbml_sid(self.m_dict['fbcGeneProducts'].at[gpid, 'label'])
locus2uid[label] = uid
# support for RBA based models
for constr_id, row in self.m_dict['species'].iterrows():
if re.match(f'{pf.MM_}', constr_id):
uids = sbmlxdf.misc.get_miriam_refs(row['miriamAnnotation'], 'uniprot')
if len(uids) == 1:
label = re.sub(f'{pf.MM_}', '', constr_id)
locus2uid[label] = uids[0]
return locus2uid
def _get_ex_rid2sid(self):
"""Get mapping of cobra exchange reaction ids to RBA uptake metabolites
From XML annotation of compartments identify the compartent for media uptake
Note: RBA allows other compartments than external compartment to
be used in Michaelis Menten saturation terms for medium uptake,
e.g. periplasm (however, this may block reactions in periplasm)
replace compartment postfix of exchanged metabolited by medium cid of RBA model.
Create mapping table from exchange reaction to medium
"""
medium_cid = 'e'
for cid, row in self.m_dict['compartments'].iterrows():
xml_annots = row.get('xmlAnnotation')
xml_attrs = sbmlxdf.misc.extract_xml_attrs(xml_annots, ns=XML_COMPARTMENT_NS)
if 'medium' in xml_attrs and xml_attrs['medium'].lower == 'true':
medium_cid = cid
break
ex_rid2sid = {}
for uptake_rid in self.uptake_rids:
row = self.m_dict['reactions'].loc[uptake_rid]
# ridx = re.sub(f'^{pf.R_}', '', uptake_rid)
sref_str = row['reactants'].split(';')[0]
sid = sbmlxdf.extract_params(sref_str)['species']
mid = sid.rsplit('_', 1)[0]
ex_rid2sid[uptake_rid] = f'{mid}_{medium_cid}'
return ex_rid2sid
[docs]
def report_model_size(self):
"""Print information about the model size.
.. code-block:: python
eo.report_model_size()
"""
n_vars_drg = 0
n_vars_ec = 0
n_vars_pmc = 0
if self.is_gpm:
self.gpm.update()
for var in self.gpm.getVars():
if re.match(pf.V_DRG_, var.VarName):
n_vars_drg += 1
elif re.match(pf.V_EC_, var.VarName):
n_vars_ec += 1
elif re.match(pf.V_PMC_, var.VarName):
n_vars_pmc += 1
model_type = 'MILP' if self.gpm.ismip else 'LP'
print(f'{model_type} Model of {self.gpm.modelname}')
print(f'{self.gpm.numvars} variables, {self.gpm.numconstrs} constraints, '
f'{self.gpm.numnzs} non-zero matrix coefficients')
else:
for rxn in self.model.reactions:
if re.match(pf.V_DRG_, rxn.id):
n_vars_drg += 1
elif re.match(pf.V_EC_, rxn.id):
n_vars_ec += 1
elif re.match(pf.V_PMC_, rxn.id):
n_vars_pmc += 1
build_str = []
if n_vars_ec > 0:
build_str.append(f'{n_vars_ec} enzymes')
if n_vars_pmc > 0:
build_str.append(f'{n_vars_pmc} process machines')
if n_vars_drg > 0:
build_str.append(f'{n_vars_drg} TD reaction constraints')
if len(build_str) > 0:
print(', '.join(build_str))
# COBRAPY MODEL RELATED
def _cp_configure_td_model_constraints(self):
"""Re-configure thermodynamics related variables and constraints in COBRApy model.
"""
is_td = False
for var in self.model.variables:
for vprefix in [pf.V_FU_, pf.V_RU_]:
if re.match(vprefix, var.name) and 'reverse' not in var.name:
is_td = True
var.type = 'binary'
for constr in self.model.constraints:
for cprefix in [pf.C_FFC_, pf.C_FRC_, pf.C_GFC_, pf.C_GRC_, pf.C_SU_]:
if re.match(cprefix, constr.name):
constr.lb = None
if is_td:
print(f'Thermodynamic use variables (V_FU_xxx and V_RU_xxx) as binary')
print(f'Thermodynamic constraints (C_F[FR]C_xxx, C_G[FR]C_xxx, C_SU_xxx) ≤ 0')
# GUROBIPY MODEL CONSTRUCTION RELATED
def _gp_create_model_neg_vars(self):
"""Create and configure a gurobipy model with data from metabolic model.
selected variables get converted to non-negative variables (Not used for now)
- however impact on RBA, ECM, pfba, fba, gp_moma and gp_room yet to be checked
- create empty gp.model:
- add variables (using reaction data from metabolic model)
- construct and implement linear constraints based on reaction string
- construct and implement linear objective
- configure TD binary variables and TD related inequality constraints
"""
# create empty gurobipy model
gpm = gp.Model(self.m_dict['modelAttrs'].id)
# set model parameters
gpm.setParam('OutputFlag', 0)
# COBRApy default Feasibility
gpm.params.FeasibilityTol = 1e-7 # 1e-6 default
gpm.params.OptimalityTol = 1e-7 # 1e-6 default
gpm.params.IntFeasTol = 1e-7 # 1e-5 default
# add variables to the gurobi model, with suppport of binary variables
td_binary_var_prefixes = f'({pf.V_FU_}|{pf.V_RU_})'
td_continuous_var_prefixes = f'({pf.V_DRG_}|{pf.V_DRG0_}|{pf.R_})'
#td_continuous_var_prefixes = f'xxxxx'
self.var_id2gpm = {}
for var_id, row in self.m_dict['reactions'].iterrows():
# configure TD forward/reverse use variables as binary
if re.match(td_binary_var_prefixes, var_id):
self.var_id2gpm[var_id] = gpm.addVar(lb=0.0, ub=1.0, vtype='B', name=var_id)
# make selected continuous variables non-negative by splitting
elif re.match(td_continuous_var_prefixes, var_id):
neg_var_id = f'{var_id}_reverse'
self.gp_neg_var[var_id] = neg_var_id
self.var_id2gpm[var_id] = gpm.addVar(lb=max(0.0, row['fbcLb']),
ub=max(0.0, row['fbcUb']), vtype='C', name=var_id)
self.var_id2gpm[neg_var_id] = gpm.addVar(lb=max(0.0, -row['fbcUb']),
ub=max(0.0, -row['fbcLb']), vtype='C', name=neg_var_id)
# balance continuous variables of the model may assume negative values
else:
self.var_id2gpm[var_id] = gpm.addVar(lb=row['fbcLb'], ub=row['fbcUb'], vtype='C', name=var_id)
# collect constraints with mapping to gp variables
constrs = {}
for var_id, row in self.m_dict['reactions'].iterrows():
reactants = get_srefs(row['reactants'])
products = get_srefs(row['products'])
srefs = {constr_id: -coeff for constr_id, coeff in reactants.items()} | products
for constr_id, coeff in srefs.items():
if constr_id not in constrs:
constrs[constr_id] = self.var_id2gpm[var_id] * coeff
else:
constrs[constr_id] += self.var_id2gpm[var_id] * coeff
# implement splitting of transformed Gibbs energy of reaction variables
if re.match(td_continuous_var_prefixes, var_id):
constrs[constr_id] += self.var_id2gpm[f'{var_id}_reverse'] * -coeff
# adding linear constraints to the model, with support of Thermodynamic constraints
# Some TD modeling constraints are defined as LHS <= 0.0
td_constr_prefixes = f'({pf.C_FFC_}|{pf.C_FRC_}|{pf.C_GFC_}|{pf.C_GRC_}|{pf.C_SU_})'
self.constr_id2gpm = {}
for constr_id, constr in constrs.items():
if re.match(td_constr_prefixes, constr_id):
self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '<', 0.0, constr_id)
else:
self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '=', 0.0, constr_id)
# configure optimization objective
df_fbc_objs = self.m_dict['fbcObjectives']
active_odata = df_fbc_objs[df_fbc_objs['active'] == bool(True)].iloc[0]
sense = gp.GRB.MAXIMIZE if 'max' in active_odata['type'].lower() else gp.GRB.MINIMIZE
vars_gpm = []
coeffs = []
for sref_str in sbmlxdf.record_generator(active_odata['fluxObjectives']):
params = sbmlxdf.extract_params(sref_str)
vars_gpm.append(self.var_id2gpm[params['reac']])
coeffs.append(float(params['coef']))
gpm.setObjective(gp.LinExpr(coeffs, vars_gpm), sense)
# update gpm with configuration data
gpm.update()
return gpm
def _gp_create_model(self):
"""Create and configure a gurobipy model with data from metabolic model.
- create empty gp.model:
- add variables (using reaction data from metabolic model)
- construct and implement linear constraints based on reaction string
- construct and implement linear objective
- construct and implement linear objective
- configure TD binary variables and TD related inequality constraints
"""
# create empty gurobipy model
gpm = gp.Model(self.m_dict['modelAttrs'].id)
# set model parameters
gpm.setParam('OutputFlag', 0)
# COBRApy default feasibility
gpm.params.FeasibilityTol = 1e-7 # 1e-6 default
gpm.params.OptimalityTol = 1e-7 # 1e-6 default
gpm.params.IntFeasTol = 1e-7 # 1e-5 default
# add variables to the gurobi model, with suppport of binary variables
self.var_id2gpm = {}
for var_id, row in self.m_dict['reactions'].iterrows():
# configure TD forward/reverse use variables as binary
if re.match(pf.V_FU_, var_id) or re.match(pf.V_RU_, var_id):
self.var_id2gpm[var_id] = gpm.addVar(lb=0.0, ub=1.0, vtype='B', name=var_id)
else:
self.var_id2gpm[var_id] = gpm.addVar(lb=row['fbcLb'], ub=row['fbcUb'], vtype='C', name=var_id)
# collect constraints with mapping to gp variables
constrs = {}
for var_id, row in self.m_dict['reactions'].iterrows():
reactants = get_srefs(row['reactants'])
products = get_srefs(row['products'])
srefs = {constr_id: -coeff for constr_id, coeff in reactants.items()} | products
for constr_id, coeff in srefs.items():
if constr_id not in constrs:
constrs[constr_id] = self.var_id2gpm[var_id] * coeff
else:
constrs[constr_id] += self.var_id2gpm[var_id] * coeff
# adding linear constraints to the model, with support of Thermodynamic constraints
# Some TD modeling constraints are defined as LHS <= 0.0
td_constr_prefixes = f'({pf.C_FFC_}|{pf.C_FRC_}|{pf.C_GFC_}|{pf.C_GRC_}|{pf.C_SU_})'
self.constr_id2gpm = {}
for constr_id, constr in constrs.items():
if re.match(td_constr_prefixes, constr_id):
self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '<', 0.0, constr_id)
else:
self.constr_id2gpm[constr_id] = gpm.addLConstr(constr, '=', 0.0, constr_id)
# configure optimization objective
df_fbc_objs = self.m_dict['fbcObjectives']
active_odata = df_fbc_objs[df_fbc_objs['active'] == bool(True)].iloc[0]
sense = gp.GRB.MAXIMIZE if 'max' in active_odata['type'].lower() else gp.GRB.MINIMIZE
vars_gpm = []
coeffs = []
for sref_str in sbmlxdf.record_generator(active_odata['fluxObjectives']):
params = sbmlxdf.extract_params(sref_str)
vars_gpm.append(self.var_id2gpm[params['reac']])
coeffs.append(float(params['coef']))
gpm.setObjective(gp.LinExpr(coeffs, vars_gpm), sense)
# update gpm with configuration data
gpm.update()
return gpm
def _get_biomass_rid(self):
"""Extract biomass reaction id from model fbcObjectives.
:return: biomass reaction id (without `R_`)
:rtype: str
"""
df_fbc_objs = self.m_dict['fbcObjectives']
active_odata = df_fbc_objs[df_fbc_objs['active'] == bool(True)].iloc[0]
sref_str = active_odata['fluxObjectives'].split(';')[0]
return re.sub(pf.R_, '', sbmlxdf.extract_params(sref_str)['reac'])
# RETRIEVING DATA REQUIRED FOR RESULTS ANALYSIS
@staticmethod
def _get_reaction_str(rdata):
"""Generate the reactions string from a reaction object.
e.g. for PFK_iso1: 'M_2pg_c => M_adp_c + M_fdp_c + M_h_c'
e.g. for PFK_iso1_REV: 'M_adp_c + M_fdp_c + M_h_c => M_2pg_c'
Exclude constraints (starting with `C_`)
:param dict(str, dict) rdata: reaction data
:return: reactions sting
:rtype: str
"""
lparts = []
rparts = []
for sid, stoic in get_srefs(rdata['reactants']).items():
if re.match(pf.C_, sid) is None:
# drop stoichiometric coefficients that are 1.0
stoic_str = f'{round(stoic, 4)} ' if stoic != 1.0 else ''
lparts.append(f'{stoic_str}{sid}')
for sid, stoic in get_srefs(rdata['products']).items():
if re.match(pf.C_, sid) is None:
stoic_str = f'{round(stoic, 4)} ' if stoic != 1.0 else ''
rparts.append(f'{stoic_str}{sid}')
dirxn = ' -> ' if rdata['reversible'] else ' => '
return ' + '.join(lparts) + dirxn + ' + '.join(rparts)
def _expand_pseudo_metabolite(self, rid, reaction_str):
"""Expand pseudo metabolite in reaction string of iso reaction.
Pseudo metabolites `pmet_` are used, when `_arm` reactions are introduced,
e.g. in GECKO models. '_arm' reactions combine fluxes of iso-reactions.
Iso reactions are connected via pseudo metabolites to the arm reaction
E.g. reversible iML1515 reaction R_FBA in GECKO model
FBA_arm, FBA_iso1, FBA_iso2
FBA_arm_REV, FBA_iso1_REV, FBA_iso2_REV
:param str rid: iso reaction idenitifer
:param str reaction_str: reaction string
:return: reactions string with metabolite
:return: str
"""
arm_rid = re.sub(r'_iso\d+', '_arm', rid)
arm_rdata = self.m_dict['reactions'].loc[arm_rid]
parts = re.split(' [-=]> ', self._get_reaction_str(arm_rdata))
expand_pmet = parts[1] if re.search(r'\w*pmet_\w+', parts[0]) else parts[0]
return re.sub(r'\w*pmet_\w+', expand_pmet, reaction_str)
def _get_reaction_compartments(self, reaction_str):
"""From reaction string extract participating compartments
e.g.
reaction_str = 'M_2dmmq8_c + M_h2_c + 2.0 M_h_c => M_2dmmql8_c + 2.0 M_h_p'
return: 'c_p'
:param str reaction_str: species srefs string as per sbmlxdf reactants/products fields in reactions
:return: sorted compartment ids joined by '-'
:rtype: str
"""
cids = set()
srefs = parse_reaction_string(reaction_str)
for idx, r_side in enumerate(['reactants', 'products']):
for sref_str in sbmlxdf.record_generator(srefs[r_side]):
params = sbmlxdf.extract_params(sref_str)
sid = params['species']
cids.add(self.m_dict['species'].loc[sid].compartment)
return '-'.join(sorted(cids))
def _get_reaction_data(self):
"""Extract reaction related data from reactions (reaction string, gpr)
Supports ARM reactions (e.g. GECKO model) by expanding pseudo metabolites.
Data can be used in augmenting flux related optimization results
:return: reaction related data, indexed by reaction id
:rtype: dict
"""
gpid2label = self.m_dict['fbcGeneProducts']['label'].to_dict()
rdatas = {}
for rid, row in self.m_dict['reactions'].iterrows():
if re.match(pf.V_, rid) is None and re.search('_arm', rid) is None:
drxn = 'rev' if re.search('_REV$', rid) else 'fwd'
fwd_rid = re.sub('_REV$', '', rid)
net_rid = re.sub(r'_iso\d+', '', fwd_rid)
groups = '; '.join(self.id2groups.get(rid, ''))
reaction_str = self._get_reaction_str(row)
if re.search(r'pmet_\w+', reaction_str) is not None:
reaction_str = self._expand_pseudo_metabolite(rid, reaction_str)
# translate gene product ids to gene labels
parts = []
if type(row['fbcGeneProdAssoc']) is str:
assoc = re.sub('assoc=', '', row['fbcGeneProdAssoc'])
for item in re.findall(r'\b\w+\b', assoc):
if item in gpid2label:
parts.append(gpid2label[item])
elif item in {'and', 'or'}:
parts.append(item)
gpr = ' '.join(parts)
# mpmf coupling used in GECKO models for fitting kcats to proteomics (not required for RBA)
mpmf_coupling = {}
if gpr != '':
loci = [item.strip() for item in gpr.split(' and ')]
reactants = get_srefs(row['reactants'])
for locus in loci:
locus = valid_sbml_sid(locus)
# note: FBA/TFA models have not data in locus2uid
if locus in self.locus2uid:
ecm_coupling_constr_id = f'{pf.C_prot_}{self.locus2uid[locus]}'
if ecm_coupling_constr_id in reactants:
mpmf_coupling[locus] = reactants[ecm_coupling_constr_id]/self.protein_per_gdw
exchange = False if type(row['products']) is str else True
rp_cids = self._get_reaction_compartments(reaction_str)
r_type = 'transport' if '-' in rp_cids else 'metabolic'
rdatas[rid] = {'net_rid': net_rid, 'drxn': drxn, 'reaction_str': reaction_str,
'gpr': gpr, 'mpmf_coupling': mpmf_coupling,
'is_exchange': exchange, 'r_type': r_type, 'compartment': rp_cids,
'groups': groups}
return rdatas
@staticmethod
def _extract_net_reaction_data(rdata):
"""Extract net reaction data, i.e. unsplit reactions
Net reaction is a reaction with reaction string in forward direction
Reversibility arrow is adjusted if there exists a corresponding reaction in reverse
Gene Product Associations are combined using 'or' for iso-reactions
:param dict(dict) rdata: reaction information
:return: reaction data with isoreactions combined
:rtype: dict(dict)
"""
# determine 'net' reaction data (i.e. unsplit the reaction)
rev_net_rids = {record['net_rid'] for rid, record in rdata.items() if record['drxn'] == 'rev'}
net_rdata = {}
for rid, record in rdata.items():
if record['drxn'] == 'fwd':
net_rid = record['net_rid']
if net_rid not in net_rdata:
reaction_str = record['reaction_str']
if net_rid in rev_net_rids:
reaction_str = re.sub('=>', '->', reaction_str)
net_rdata[net_rid] = {'reaction_str': reaction_str, 'gpr': record['gpr'],
'groups': record['groups']}
else:
net_rdata[net_rid]['gpr'] += ' or ' + record['gpr']
return net_rdata
def get_rids_catalyzed(self):
"""Get reactions that are catalyzed with enzyme composition (gene labels).
Data can be used to assess impact of gene deletions
Data can be used to identify reactions catalyzed by a specific gene
Note: does not support 'or' in alternative components of enzyme complex like (b1234 or b2346) and b2222
Workaround: rearrange gpr: (b1234 and b2222) or (b2346 and b2222)
:meta private:
:return: enzymes and their compositions of catalyzed reactions
:rtype: dict (key: rid/str, val: list of sets of gene labels)
"""
gpid2label = self.m_dict['fbcGeneProducts']['label'].to_dict()
rids_catalyzed = {}
for rid, rdata in self.m_dict['reactions'].iterrows():
if re.match(pf.V_, rid) is None:
enzymes = []
if type(rdata['fbcGeneProdAssoc']) is str:
assoc = re.sub('assoc=', '', rdata['fbcGeneProdAssoc'])
for enz_str in assoc.split('or'):
labels = set()
for gpid in re.findall(r'\b\w+\b', enz_str):
if gpid in gpid2label:
labels.add(gpid2label[gpid])
enzymes.append(labels)
if len(enzymes) > 0:
rids_catalyzed[rid] = enzymes
return rids_catalyzed
# SUPPORT FUNCTIONS FOR GENE DELETION STUDY
[docs]
def get_rids_blocked_by(self, genes):
"""Identify reactions that get blocked when deleting selected genes.
Example: Identify reactions that are blocked by the strain background.
.. code-block:: python
strain_background = {'b0063', 'b0062', 'b0061', 'b3904', 'b3903', 'b3902', 'b0344'}
blocked_rids = eo.get_rids_blocked_by(strain_background)
:param genes: gene or genes
:type genes: str or list(str)
:return: reaction ids affected by gene deletions
:return: list(str)
"""
if type(genes) is str:
genes = {genes}
elif type(genes) is list:
genes = set(genes)
assert type(genes) is set, 'genes must be of type str of a set/list of str'
blocked_rids = []
for rid, enzymes in self.rids_catalyzed.items():
impacted = True
for enzyme in enzymes:
if len(genes.intersection(enzyme)) == 0:
impacted = False
break
if impacted:
blocked_rids.append(rid)
return blocked_rids
# OPTIMIZATION RELATED
[docs]
def get_variable_bounds(self, var_ids):
"""Retrieve upper/lower bounds of selected model variables.
Bounds can be queried for a single variable or a list of variables.
Example: Retrieve variable bounds for single variable, list of variables.
.. code-block:: python
bounds_1 = eo.get_variable_bounds('V_PC_total')
bounds_2 = eo.get_variable_bounds(['POR5_iso1', 'POR5_iso1_REV'])
:param var_ids: variable identifier or a list of variable identifiers
:type var_ids: str or list(str)
:return: variable ids with respective lower und upper bounds
:rtype: dict(str, tuple)
"""
if type(var_ids) is str:
var_ids = [var_ids]
variable_bounds = {}
if self.is_gpm:
self.gpm.update()
# support splitting of variables in two non-negative variables.
for var_idx in var_ids:
var_id = f'{pf.R_}{var_idx}' if f'{pf.R_}{var_idx}' in self.var_id2gpm else var_idx
assert var_id in self.var_id2gpm, f'{var_id} not found'
var = self.var_id2gpm[var_id]
if var_id in self.gp_neg_var:
neg_var = self.var_id2gpm[self.gp_neg_var[var_id]]
ub = var.ub if neg_var.lb == 0.0 else -neg_var.lb
lb = -neg_var.ub if var.lb == 0.0 else var.lb
else:
ub = var.ub
lb = var.lb
variable_bounds[var_idx] = (lb, ub)
else:
for var_id in var_ids:
assert var_id in self.model.reactions, f'{var_id} not found'
variable_bounds[var_id] = self.model.reactions.get_by_id(var_id).bounds
return variable_bounds
[docs]
def set_variable_bounds(self, variable_bounds):
"""Set upper/lower bounds of selected model variables.
Bound values set to 'None' will not be modified.
Example: Block iso-reaction POR5_iso1 in forward and reverse directions. Only upper bounds
need to be changed to 0.0.
.. code-block:: python
orig_bounds = eo.set_variable_bounds({'POR5_iso1': (None, 0.0), 'POR5_iso1_REV': (None, 0.0)})
:param dict(str, tuple) variable_bounds: variable identifiers with tuple of lower and upper bounds
:return: original bounds
:rtype: dict(str, tuple)
"""
orig_bounds = self.get_variable_bounds(list(variable_bounds))
if self.is_gpm:
self.gpm.update()
# support splitting of variables in two non-negative variables.
for var_idx, (lb, ub) in variable_bounds.items():
var_id = f'{pf.R_}{var_idx}' if f'{pf.R_}{var_idx}' in self.var_id2gpm else var_idx
assert var_id in self.var_id2gpm, f'{var_id} not found'
var = self.var_id2gpm[var_id]
if var_id in self.gp_neg_var:
neg_var = self.var_id2gpm[self.gp_neg_var[var_id]]
if ub is not None:
if ub >= 0.0:
var.ub = ub
if neg_var.lb > 0.0:
neg_var.lb = 0.0
else:
neg_var.lb = -ub
if var.ub > 0.0:
var.ub = 0.0
if lb is not None:
if lb >= 0.0:
var.lb = lb
if neg_var.ub > 0.0:
neg_var.ub = 0.0
else:
neg_var.ub = -lb
if var.lb > 0.0:
var.lb = 0.0
else:
if lb is not None:
var.lb = lb
if ub is not None:
var.ub = ub
else:
for var_id, (lb, ub) in variable_bounds.items():
assert var_id in self.model.reactions, f'{var_id} not found'
rxn = self.model.reactions.get_by_id(var_id)
# orig_bounds[var_id] = rxn.bounds
if (lb is not None) and (ub is not None):
rxn.bounds = (lb, ub)
elif lb is not None:
rxn.lower_bound = lb
elif ub is not None:
rxn.upper_bound = ub
return orig_bounds
[docs]
def get_objective(self):
"""Retrieve model optimization objective.
Example: Query optimization objective of the model.
.. code-block:: python
eo.get_objective()
:return: variable coefficients and optimization direction
:rtype: dict(str, float), str
"""
if self.is_gpm:
lin_expr = self.gpm.getObjective()
objective = {}
for idx in range(lin_expr.size()):
var_id = lin_expr.getVar(idx).VarName
objective[re.sub(pf.R_, '', var_id)] = lin_expr.getCoeff(idx)
direction = {gp.GRB.MINIMIZE: 'min', gp.GRB.MAXIMIZE: 'max'}[self.gpm.ModelSense]
else:
objective = {}
sympy_expr = self.model.solver.objective.expression
for var, coeff in sympy_expr.as_coefficients_dict().items():
varname = var.name
if '_reverse_' not in varname:
objective[varname] = coeff
direction = self.model.objective.direction
return objective, direction
[docs]
def set_objective(self, objective, direction='max'):
"""Set optimization objective.
Example: Modify the optimization objective of a model.
.. code-block:: python
eo.set_objective({'BIOMASS_Ec_iML1515_WT_75p37M': 1.0}, 'max')
:param dict(str, float) objective: variable identifiers with coefficients
:param str direction: direction of optimization: 'min' or 'max' (default: 'max')
"""
if self.is_gpm:
sense = gp.GRB.MAXIMIZE if 'max' in direction.lower() else gp.GRB.MINIMIZE
variables = []
coeffs = []
for var_id, coeff in objective.items():
if var_id not in self.var_id2gpm:
var_id = f'{pf.R_}{var_id}'
assert var_id in self.var_id2gpm
variables.append(self.var_id2gpm[var_id])
coeffs.append(coeff)
self.gpm.setObjective(gp.LinExpr(coeffs, variables), sense)
self.gpm.update()
else:
self.model.objective = {self.model.reactions.get_by_id(var_id): coeff
for var_id, coeff in objective.items()}
self.model.objective_direction = direction
[docs]
def update_relaxation(self, fluxes, eps=0.5):
"""TFA slack model relaxation for a specified flux distribution.
In case optimizations of a model with added thermodynamics constraints leads
to infeasable solutions, model relaxation is required. Model relaxation is performed on a
TFA model with slack variables configured for ∆rG'˚ (standard transformed Gibbs energy of reaction)
variables. Use TfaModel.export_slack_model() to generate such a model.
This method is used to determine adjusted ∆rG'˚ variable bounds that make the TD model feasible
and to configure the updated bounds to support optimization of further conditions.
Example 1: Perform TFA model relaxation using COBRApy interface. With the additional constraint
of achieving at least 95% of the growth rate of the model without TD constraints.
.. code-block:: python
import cobra
from f2xba import FbaOptimization
tfa_slack_model = cobra.io.read_sbml_model('iML1515_TFA_slack.xml')
fo = FbaOptimization('iML1515_TFA_slack.xml', tfa_slack_model)
tfa_slack_model.reactions.get_by_id('BIOMASS_Ec_iML1515_core_75p37M').lower_bound = 0.95 * wt_gr
tfa_slack_model.medium = medium
solution = tfa_slack_model.optimize()
drg0_relaxations = fo.update_relaxation(solution.fluxes)
Example 2: Perform TFA model relaxation using cobrapy interface.
.. code-block:: python
from f2xba import FbaOptimization
fo = FbaOptimization('iML1515_TFA_slack.xml')
orig_bounds = fo.set_variable_bounds({'BIOMASS_Ec_iML1515_core_75p37M':(0.95 * wt_gr, None)})
fo.medium = medium
solution = fo.optimize()
fo.set_variable_bounds(orig_bounds)
drg0_relaxations = fo.update_relaxation(solution.fluxes)
:param dict fluxes: fluxes of a TFA slack model optimization solution
:param float eps: additional margin in kJ/mol (default: 0.5)
:return: ∆rG'˚ variables with updated bounds.
:rtype: dict(str, dict(str,float)
"""
# collect DRG0 variables that need relaxation
slack_vars = {}
for var_id, val in fluxes.items():
if re.match(pf.V_NS_, var_id) and val > 1e-7:
slack_vars[re.sub(f'^{pf.V_NS_}', pf.V_DRG0_, var_id)] = -val
elif re.match(pf.V_PS_, var_id) and val > 1e-7:
slack_vars[re.sub(f'^{pf.V_PS_}', pf.V_DRG0_, var_id)] = val
# relax DRG0 variables by adjusting respective bound
drg0_relaxations = {}
update_bounds = {}
for drg0_id, (lb, ub) in self.get_variable_bounds(list(slack_vars)).items():
slack = slack_vars[drg0_id]
if slack < 0.0:
new_lb = lb + slack - eps
update_bounds[drg0_id] = (new_lb, None)
drg0_relaxations[drg0_id] = {'fbc_lower_bound': new_lb}
else:
new_ub = ub + slack + eps
update_bounds[drg0_id] = (None, new_ub)
drg0_relaxations[drg0_id] = {'fbc_upper_bound': new_ub}
self.set_variable_bounds(update_bounds)
return drg0_relaxations
[docs]
def modify_stoic(self, constr_id, var_id, new_val):
"""Modify a stoichiometric coefficient using gurobipy interface.
Stoichiometric coefficients in a COBRApy model can be modified using the COBRApy method
cobra.core.reaction.add_metabolites().
This method supports the modification of a stoichiometric coefficient
when using the gurobipy interface. As model contexts are not supported, the coefficient
should be reset to the original value after optimization.
Example: Remove the cofactor requirement of hemeA in anaerobic conditions.
.. code-block:: python
if 'EX_o2_e' not in medium:
orig_stoic = eo.modify_stoic('M_hemeA_c', 'R_CofactorSynth', 0.0)
eo.optimize()
if 'EX_o2_e' not in medium:
eo.modify_stoic('M_hemeA_c', 'R_CofactorSynth', orig_stoic)
:param str constr_id: constraint identifier, e.g. species id
:param str var_id: variable identifier, e.g. reaction id
:param float new_val: stoichiometric coefficient (negative for consumation)
:return: original stoichiometric coefficient
:rtype: float
"""
if self.is_gpm:
constr = self.gpm.getConstrByName(constr_id)
var = self.gpm.getVarByName(var_id)
if constr is None:
print(f'Constraint with identifier "{constr_id}" not found!')
return None
if var is None:
print(f'Variable with identifier "{var_id}" not found!')
return None
old_val = self.gpm.getCoeff(constr, var)
self.gpm.chgCoeff(constr, var, new_val)
if var_id in self.gp_neg_var:
neg_var = self.gpm.getVarByName(self.gp_neg_var[var_id])
self.gpm.chgCoeff(constr, neg_var, -new_val)
return old_val
else:
print('Method only implemented for gurobipy interface, for COBRApy, use rxn.add_metabolites()')
return None
def set_medium(self, medium):
"""Configure medium in gurobipy model from dictionary with uptake fluxes.
It is recommended to rather assign the medium as in COBRApy (use medium property)
e.g. medium = {'EX_glc__D_e': 10.0, 'EX_o2_e': 20.0, 'EX_ca2_e': 1000.0, ...}
For gurobipy implementation, support splitting of variables in two non-negative variables.
:meta private:
:param dict(str, float) medium: nutrients in medium, i.e. exchange reactions with max uptake rate
"""
if self.is_gpm:
update_bounds = {}
for ex_rid in self.uptake_rids:
ex_ridx = re.sub('^R_', '', ex_rid)
if ex_ridx in medium:
update_bounds[ex_rid] = (-medium[ex_ridx], None)
else:
update_bounds[ex_rid] = (0.0, None)
self.set_variable_bounds(update_bounds)
else:
self.model.medium = medium
[docs]
def optimize(self, alt_model=None):
"""Optimize gurobipy model and return COBRApy compliant solution.
Example: Optimize the model.
.. code-block:: python
eo.optimize()
Thermodynamics constrained models are supported inline with pyTFA package.
:param alt_model: gurobipy model (default: None)
:type alt_model: :class:`gurobipy.Model`
:return: optimization solution
:rtype: :class:`Solution`
"""
model = self.gpm if alt_model is None else alt_model
model.optimize()
solution = self._gp_get_solution(model)
return solution
def _gp_combine_neg_vars(self, data_raw):
"""Combine negative variable values from gurobipy optimization.
:param dict(str, float) data_raw: optimization variables with values.
:return:
"""
data = {}
neg_var_ids = set(self.gp_neg_var.values())
for var_id, val in data_raw.items():
var_idx = re.sub(f'^{pf.R_}', '', var_id)
if var_id not in neg_var_ids:
if var_id in self.gp_neg_var:
data[var_idx] = data_raw[var_id] - data_raw[self.gp_neg_var[var_id]]
else:
data[var_idx] = data_raw[var_id]
return data
def _gp_get_solution(self, alt_model=None):
"""Retrieve optimization solution for gp model.
Solution as per COBRApy solution
Reaction and Metabolites ids without `R_`, `M_` prefix
- status
- objective value
- fluxes
- reduced costs
- shadow prices
:param alt_model: (optional) alternative model (e.g. pFBA model)
:type alt_model: gurobipy.Model, if provided
:return: optimization solution
:rtype: class:`Solution`
"""
model = self.gpm if alt_model is None else alt_model
results_dict = {'status': status2text[model.status].lower(), 'tmp': pd.Series()}
if hasattr(model, 'objval'):
results_dict['objective_value'] = model.objval
if hasattr(model.getVars()[0], 'x'):
data_raw = {var.VarName: var.X for var in model.getVars()}
results_dict['fluxes'] = pd.Series(self._gp_combine_neg_vars(data_raw), name='fluxes')
if hasattr(model.getVars()[0], 'rc'):
data_raw = {var.VarName: var.RC for var in model.getVars()}
results_dict['reduced_costs'] = pd.Series(self._gp_combine_neg_vars(data_raw), name='reduced_costs')
if hasattr(model.getConstrs()[0], 'pi'):
data = {re.sub(f'^{pf.M_}', '', constr.ConstrName): constr.Pi
for constr in model.getConstrs()}
results_dict['shadow_prices'] = pd.Series(data, name='shadow_prices')
del results_dict['tmp']
return Solution(**results_dict)
[docs]
def pfba(self, fraction_of_optimum=1.0):
"""Perform parsimonious FBA optimization (pFBA) on a FBA model using the gurobipy interface.
Example: Execute a pFBA optimization using the gurobipy interface.
.. code-block:: python
solution = eo.pfba()
:param float fraction_of_optimum: factor to scale FBA objective (default: 1.0)
:return: optimization solution
:return: :class:`Solution`
"""
if self.is_gpm is None:
print('Method implemented for gurobipy interface only.')
return None
# determine fba growth rate
fba_solution = self.optimize()
fba_gr = fba_solution.objective_value
pfba_solution = fba_solution.objective_value
if np.isfinite(fba_gr):
# create an pFBA model based on FBA model and make reactions irreversible
pfba_gpm = self.gpm.copy()
for var in pfba_gpm.getVars():
if var.lb < 0:
fwd_col = pfba_gpm.getCol(var)
rev_coeffs = [-fwd_col.getCoeff(idx) for idx in range(fwd_col.size())]
constrs = [fwd_col.getConstr(idx) for idx in range(fwd_col.size())]
rev_col = gp.Column(coeffs=rev_coeffs, constrs=constrs)
pfba_gpm.addVar(lb=0.0, ub=-var.lb, vtype=var.VType, name=f'{var.VarName}_REV', column=rev_col)
var.lb = 0.0
# create temporary constraint for FBA objective
fba_objective = pfba_gpm.getObjective()
pfba_gpm.addLConstr(fba_objective, '=', fba_gr * fraction_of_optimum, 'FBA objective')
# New Objective: minimize sum of all non-negative reactions
pfba_gpm.update()
vars_gpm = [var for var in pfba_gpm.getVars()]
pfba_objective = gp.LinExpr(np.ones(len(vars_gpm)), vars_gpm)
pfba_gpm.setObjective(pfba_objective, gp.GRB.MINIMIZE)
# optimize and collect results (without error handling)
pfba_gpm.optimize()
pfba_solution = self._gp_get_solution(pfba_gpm)
pfba_solution.fluxes = extract_net_fluxes(pfba_solution.fluxes)
pfba_gpm.close()
return pfba_solution
[docs]
def fva(self, rids=None, fraction_of_optimum=1.0):
"""Perform a Flux Variability Analysis (FVA) on a FBA model using the gurobipy interface.
A list of reaction ids can be provided to limit the scope of the FVA analysis.
Example: Execute FVA analyis for a set of reactions using the gurobipy interface.
.. code-block:: python
eo.fva(['CS', 'ME1', 'POR5_iso1', 'POR5_iso1_REV'])
:param list(str) rids: reaction identifiers (without leading `R_`) (default: None)
:param float fraction_of_optimum: scaling of wild type objective value (default: 1.0)
:return: table with minimum and maximum reaction fluxes
:rtype: pandas.DataFrame
"""
if self.is_gpm is None:
print('Method implemented for gurobipy interface only.')
return None
ridx2rid = {re.sub('R_', '', rid): rid
for rid in self.m_dict['reactions'].index if re.match('^R_', rid)}
selected_rids = list(ridx2rid.values()) if rids is None else [ridx2rid[ridx] for ridx in rids]
# determine wildtype growth rate
wt_gr = self.optimize().objective_value
results = {}
if np.isfinite(wt_gr):
# create a temporary FVA model based on FBA model
fva_gpm = self.gpm.copy()
# add wt objective as constraint
wt_objective = fva_gpm.getObjective()
fva_gpm.addLConstr(wt_objective, '=', wt_gr * fraction_of_optimum, 'wild type objective')
fva_gpm.update()
for rid in selected_rids:
var = fva_gpm.getVarByName(rid)
fva_gpm.setObjective(var, gp.GRB.MINIMIZE)
min_flux = self.optimize(fva_gpm).objective_value
fva_gpm.setObjective(var, gp.GRB.MAXIMIZE)
max_flux = self.optimize(fva_gpm).objective_value
results[re.sub('^R_', '', rid)] = [min_flux, max_flux]
fva_gpm.close()
return pd.DataFrame(results.values(), list(results.keys()), columns=['minimum', 'maximum'])
def solve(self, **rba_solve_params):
"""RBA solver, overwritten by RbaOptimization
:meta private:
"""
return pd.DataFrame()
def get_active_genes(self, fluxes):
"""Determine active genes based on a reaction flux distribution.
All genes required for isoenzymes of a reactions carrying flux are collected.
:meta private:
:param dict(str, float) fluxes: reaction fluxes in mmol/gDWh
:return: set of genes potentially required for given reaction flux destribution
:rtype: set
"""
# map reaction ids without 'R_' prefix, as per COBRApy to reaction ids used in the model
ridx2rid = {re.sub(f'^{pf.R_}', '', rid): rid for rid in self.rids_catalyzed}
gene_list = set()
for ridx, flux in fluxes.items():
if abs(flux) > 0.0:
if ridx in ridx2rid:
for enz_comp in self.rids_catalyzed[ridx2rid[ridx]]:
for gene in enz_comp:
gene_list.add(gene)
print(f'{len(gene_list)} genes potentially active given flux distribution.')
return gene_list
[docs]
def gene_knock_outs(self, genes):
"""Block reactions that are catalyzed by specified gene/genes.
Example: Block reactions that depend on the specified gene and perform a model optimization.
.. code-block:: python
orig_bounds = eo.gene_knock_outs('b0025')
solution = eo.optimize()
eo.set_variable_bounds(orig_bounds)
print(f'mutant gr: {solution.objective_value:.3f} h-1')
:param genes: gene identifiers
:type genes: str or list(str)
:return: original reaction bounds (lb, ub)
:rtype: dict(str, tuple)
"""
if type(genes) is str:
genes = [genes]
if self.is_gpm:
update_bounds = {var_id: (0.0, 0.0) for var_id in self.get_rids_blocked_by(genes)}
orig_rid_bounds = self.set_variable_bounds(update_bounds)
else:
orig_rid_bounds = {}
for gene in genes:
assert gene in self.model.genes, f'{gene} not found'
for react in self.model.genes.get_by_id(gene).reactions:
orig_rid_bounds[react.id] = (react.lower_bound, react.upper_bound)
for gene in genes:
self.model.genes.get_by_id(gene).knock_out()
return orig_rid_bounds
[docs]
def moma(self, wt_fluxes, linear=False):
"""Perform a MOMA based optimization using the gurobipy interface.
Supported for FBA and ECM models.
Ref: Segre et al. 2002, Analysis of optimality in natural and perturbed
metabolic networks.
MOMA (`linear` = False):
minimize: ∑ (v - wt)^2
s.t.: S * v = 0; v_lb ≤ v ≤ v_ub
v: disturbed flux distribution; wt: wild type flux distribution.
This requires adding transformed variables `vt` with adjusted bounds, adding new constraints `vt = v - wt`
and minimizing the objective function `∑ vt^2`.
Linear MOMA (`linear` = True):
minimize: ∑ abs(v - wt)
s.t.: S * v = 0; v_lb ≤ v ≤ v_ub
introducing non-negative transformed flux variables and related constraints and minimizing `∑ (vtp + vtn)`
`vtp`: [0 ... (v_ub - wt)] with constraints `vtp ≥ v - wt`
`vtn`: [0 ... (wt - v_lb)] with constraints `vtn ≥ wt - v`
Example: Perform MOMA analysis for a disturbed network, caused by gene deletion.
.. code-block:: python
fo.medium = medium
wt_solution = fo.pfba()
orig_bounds = fo.gene_knock_outs('b0003')
moma_solution = fo.moma(wt_solution.fluxes)
fo.set_variable_bounds(orig_bounds)
print(f'{moma_solution.fluxes['BIOMASS_Ec_iML1515_core_75p37M']:.3f} h-1')
:param dict(str, float) wt_fluxes: wild type flux distribution, e.g. pFBA fluxes
:param bool linear: using quadratic (Euclidean distance) or linear formulation (Manhattan) (default: False)
:return: MOMA determined solution
:rtype: :class:`Solution`
"""
if self.is_gpm is None:
print('Method implemented for gurobipy interface only.')
return pd.DataFrame()
self.gpm.update()
moma_gpm = self.gpm.copy()
tflux_vars = []
for ridx, wt_flux in wt_fluxes.items():
flux_var = moma_gpm.getVarByName(f'R_{ridx}')
if flux_var is None:
flux_var = moma_gpm.getVarByName(ridx)
if not linear:
tflux_lb = flux_var.LB - wt_flux
tflux_ub = flux_var.UB - wt_flux
tflux_var = moma_gpm.addVar(lb=tflux_lb, ub=tflux_ub, vtype=gp.GRB.CONTINUOUS,
name=f'V_MOMA_transformed_flux_{ridx}')
tflux_vars.append(tflux_var)
moma_gpm.addLConstr(flux_var - tflux_var, sense=gp.GRB.EQUAL, rhs=wt_flux)
else:
tfluxp_ub = flux_var.UB - wt_flux
# if wild type flux already at the upper boundary we do not require fluxp variable/constraint
if tfluxp_ub > 0:
tfluxp_var = moma_gpm.addVar(lb=0, ub=tfluxp_ub, vtype=gp.GRB.CONTINUOUS,
name=f'V_MOMA_transformed_fluxp_{ridx}')
tflux_vars.append(tfluxp_var)
moma_gpm.addLConstr(flux_var - tfluxp_var, sense=gp.GRB.LESS_EQUAL, rhs=wt_flux)
tfluxn_ub = wt_flux - flux_var.LB
if tfluxn_ub > 0:
tfluxn_var = moma_gpm.addVar(lb=0, ub=tfluxn_ub, vtype=gp.GRB.CONTINUOUS,
name=f'V_MOMA_transformed_fluxn_{ridx}')
tflux_vars.append(tfluxn_var)
moma_gpm.addLConstr(flux_var + tfluxn_var, sense=gp.GRB.GREATER_EQUAL, rhs=wt_flux)
if not linear:
moma_gpm.setObjective(gp.quicksum([v * v for v in tflux_vars]), gp.GRB.MINIMIZE)
else:
moma_gpm.setObjective(gp.quicksum(tflux_vars), gp.GRB.MINIMIZE)
moma_gpm.optimize()
moma_solution = self._gp_get_solution(moma_gpm)
moma_gpm.close()
return moma_solution
[docs]
def room(self, wt_fluxes, linear=False, delta=0.03, epsilon=1e-3, time_limit=30.0):
"""Perform a ROOM based optimization using the gurobipy interface.
Ref: Shlomi et al., 2005, Regulatory on off minimization of metabolic flux
changes after genetic perturbations
ROOM (`linear` = False):
minimize: ∑ y^i
s.t. S * v = 0; v_lb ≤ v ≤ v_ub
v - y * (v_ub - wt_u) <= wt_u; wt_u = wt + delta * abs(wt) + \epsilon
v - y * (v_lb - wt_l) <= wt_l; wt_l = wt - delta * abs(wt) - \epsilon
v: disturbed flux distribution; wt: wild type flux distribution.
y_i ∈ {0,1}
Linear ROOM (`linear` = True):
0 <= y_i <= 1
Example: Perform ROOM analysis for a disturbed network, caused by gene deletion.
.. code-block:: python
fo.medium = medium
wt_solution = fo.pfba()
orig_bounds = fo.gene_knock_outs('b0003')
room_solution = fo.room(wt_solution.fluxes)
fo.set_variable_bounds(orig_bounds)
print(f'{room_solution.fluxes['BIOMASS_Ec_iML1515_core_75p37M']:.3f} h-1')
:param dict(str, float) wt_fluxes: wild type flux distribution, e.g. pFBA fluxes
:param bool linear: using MILP (False) or relaxed LP (True) formulation (default: False)
:param float delta: relative tolerance range (default: 0.03)
:param float epsilon: absolute tolerance range (default: 1e-3)
:param float time_limit: time limit in seconds (default: 30.0)
:return: ROOM determined solution
:rtype: :class:`Solution`
"""
if self.is_gpm is None:
print('Method implemented for gurobipy interface only.')
return None
self.gpm.update()
room_gpm = self.gpm.copy()
if not linear:
room_gpm.params.timelimit = time_limit
flux_ctrl_vars = []
for ridx, wt_flux in wt_fluxes.items():
flux_var = room_gpm.getVarByName(f'R_{ridx}')
if flux_var is None:
flux_var = room_gpm.getVarByName(ridx)
flux_lb = flux_var.LB
flux_ub = flux_var.UB
if not linear:
wl_flux = wt_flux - abs(wt_flux) * delta - epsilon
wu_flux = wt_flux + abs(wt_flux) * delta + epsilon
else:
wl_flux = wt_flux
wu_flux = wt_flux
if wl_flux > flux_lb:
if not linear:
flux_lower_ctrl_var = room_gpm.addVar(lb=0, ub=1, vtype=gp.GRB.BINARY,
name=f'V_ROOM_wl_control_{ridx}')
else:
flux_lower_ctrl_var = room_gpm.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS,
name=f'V_ROOM_wl_control_{ridx}')
flux_ctrl_vars.append(flux_lower_ctrl_var)
room_gpm.addLConstr(flux_var - flux_lower_ctrl_var * (flux_lb - wl_flux),
sense=gp.GRB.GREATER_EQUAL, rhs=wl_flux)
if wu_flux < flux_ub:
if not linear:
flux_upper_ctrl_var = room_gpm.addVar(lb=0, ub=1, vtype=gp.GRB.BINARY,
name=f'V_ROOM_wu_control_{ridx}')
else:
flux_upper_ctrl_var = room_gpm.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS,
name=f'V_ROOM_wu_control_{ridx}')
flux_ctrl_vars.append(flux_upper_ctrl_var)
room_gpm.addLConstr(flux_var - flux_upper_ctrl_var * (flux_ub - wu_flux),
sense=gp.GRB.LESS_EQUAL, rhs=wu_flux)
room_gpm.setObjective(gp.quicksum(flux_ctrl_vars), gp.GRB.MINIMIZE)
room_gpm.optimize()
room_solution = self._gp_get_solution(room_gpm)
room_gpm.close()
return room_solution
def __del__(self):
if self.is_gpm and self.gpm is not None:
self.gpm.close()