import os
from typing import List, Union
from enum import Enum
import urllib3
import zstandard
import ncbi_taxon_db
try:
import ncbi_genbank_accession_db as accession_db
except ImportError:
import ncbi_refseq_accession_db as accession_db
try:
import ncbi_genbank_accession_lengths as accession_lengths
except ImportError:
import ncbi_refseq_accession_lengths as accession_lengths
try:
import ncbi_genbank_accession_offsets as accession_offsets
except ImportError:
import ncbi_refseq_accession_offsets as accession_offsets
from .const import blast_db_timestamp
from .util import TwoBitDecoder
from .version import __version__ # noqa
from .vendored.marisa_trie import RecordTrie
Rank = Enum(
"Rank",
("biotype clade class cohort family forma forma_specialis genotype genus infraclass infraorder isolate kingdom "
"morph order parvorder pathogroup phylum section series serogroup serotype species species_group species_subgroup "
"strain subclass subcohort subfamily subgenus subkingdom suborder subphylum subsection subspecies subtribe "
"subvariety superclass superfamily superkingdom superorder superphylum tribe varietas no_rank")
)
BLASTDatabase = Enum("BLASTDatabase",
("ref_viruses_rep_genomes ref_prok_rep_genomes ref_euk_rep_genomes Betacoronavirus nt"))
[docs]class TaxoniqException(Exception):
pass
[docs]class NoValue(TaxoniqException):
pass
class DatabaseService:
_databases = {}
def _get_db(self, db_name):
if db_name not in self._databases:
db_type, filename = self._db_files[db_name]
if db_type == zstandard:
with open(filename, "rb") as fh:
self._databases[db_name] = zstandard.decompress(fh.read())
else:
self._databases[db_name] = db_type.mmap(filename)
return self._databases[db_name]
class ItemAttrAccess:
def __getitem__(self, item):
return getattr(self, item)
[docs]class Accession(DatabaseService, ItemAttrAccess):
"""
An object representing an NCBI GenBank nucleotide or protein sequence accession ID.
This is used by Taxoniq to represent sequences associated with taxons; use :class:`Taxon` as the starting point.
"""
_db_files = {
"accessions": (RecordTrie("IH"), accession_db.db),
"accession_offsets": (RecordTrie("I"), accession_offsets.db),
"accession_lengths": (RecordTrie("I"), accession_lengths.db)
}
http = urllib3.PoolManager(maxsize=min(32, os.cpu_count() + 4))
s3_host = "ncbi-blast-databases.s3.amazonaws.com"
def __init__(self, accession_id: str):
self.accession_id = accession_id
self._packed_id = self._pack_id(accession_id)
self._tax_id = None
self._blast_db, self._blast_db_volume, self._db_offset, self._length = None, None, None, None
def _load_accession_data(self):
self._tax_id, db_info = self._get_db("accessions")[self._packed_id][0]
self._blast_db = BLASTDatabase(db_info >> 8)
self._blast_db_volume = db_info & 0xff
@property
def tax_id(self):
"""
The taxon ID associated with this sequence accession ID.
"""
if self._tax_id is None:
self._load_accession_data()
return self._tax_id
@property
def blast_db(self):
"""
The BLAST database in which this sequence accession ID was indexed.
"""
if self._blast_db is None:
self._load_accession_data()
return self._blast_db
@property
def blast_db_volume(self):
"""
The numeric BLAST database volume ID in which this sequence accession was indexed.
"""
if self._blast_db_volume is None:
self._load_accession_data()
return self._blast_db_volume
@property
def length(self):
"""
The length of the sequence (number of nucleotides or amino acids).
"""
if self._length is None:
self._length = self._get_db("accession_lengths")[self._packed_id][0][0]
return self._length
@property
def db_offset(self):
"""
The byte offset in the BLAST database volume at which this sequence starts.
"""
if self._db_offset is None:
self._db_offset = self._get_db("accession_offsets")[self._packed_id][0][0]
return self._db_offset
def _pack_id(self, accession_id):
if accession_id.endswith(".1"):
accession_id = accession_id[:-len(".1")]
accession_id = accession_id.replace("_", "")
return accession_id
[docs] def get_from_s3(self):
"""
Returns a file-like object streaming the nucleotide sequence for this accession from the AWS S3 NCBI BLAST
database mirror (https://registry.opendata.aws/ncbi-blast-databases/), if available.
"""
blast_db = f"{self.blast_db.name}.{str(self.blast_db_volume).rjust(2, '0')}"
s3_url = f"https://{self.s3_host}/{blast_db_timestamp}/{blast_db}.nsq"
headers = {"Range": f"bytes={self.db_offset}-{self.db_offset + (self.length // 4)}"}
res = self.http.request("GET", s3_url, headers=headers, preload_content=False)
res._decoder = TwoBitDecoder(self.length)
return res
[docs] def get_from_gs(self):
"""
Returns a file-like object streaming the nucleotide sequence for this accession from the Google Storage NCBI
BLAST database mirror (https://registry.opendata.aws/ncbi-blast-databases/), if available.
"""
raise NotImplementedError()
[docs] def url(self):
"""
Returns the HTTPS URL for the NCBI GenBank web page representing this sequence accession ID.
"""
return f"https://www.ncbi.nlm.nih.gov/nuccore/{self.accession_id}"
def __eq__(self, other):
return self.accession_id == other.accession_id
def __repr__(self):
return "{}.{}('{}')".format(self.__module__, self.__class__.__name__, self.accession_id)
[docs]class Taxon(DatabaseService, ItemAttrAccess):
"""
An object representing an NCBI Taxonomy taxon, identified by its taxon ID. The object can be instantiated by
uniquely identifying a taxon using the numeric taxon ID, an alphanumeric accession ID of a sequence associated with
the taxon ID, or the scientific name of the taxon.
"""
# TODO: more attributes from structured metadata at species/strain level e.g. gc, ploidy, ...
_db_dir = ncbi_taxon_db.db_dir
_db_files = {
"taxa": (RecordTrie("IBBB"), os.path.join(_db_dir, "taxa.marisa")),
"wikidata": (RecordTrie("I"), os.path.join(_db_dir, "wikidata.marisa")),
"sn2t": (RecordTrie("I"), os.path.join(_db_dir, "sn2taxid.marisa")),
}
_string_index_names = ("scientific_name", "common_name", "taxid2refrep", "taxid2refseq", "description",
"en_wiki_title", "child_nodes", "host")
for _string_index in _string_index_names:
_db_files[_string_index] = (zstandard, os.path.join(_db_dir, _string_index + ".zstd"))
_db_files[_string_index + "_pos"] = (RecordTrie("I"), os.path.join(_db_dir, _string_index + ".marisa"))
common_ranks = {
Rank[i] for i in ("species", "genus", "family", "order", "class", "phylum", "kingdom", "superkingdom")
}
def __init__(self, tax_id: int = None, accession_id: str = None, scientific_name: str = None):
if sum(x is not None for x in (tax_id, accession_id, scientific_name)) != 1:
raise TaxoniqException("Expected exactly one of tax_id, accession_id, or scientific_name to be set")
if tax_id is not None:
self.tax_id = tax_id
elif accession_id is not None:
self.tax_id = Accession(accession_id).tax_id
elif scientific_name is not None:
self.tax_id = self._get_db("sn2t")[scientific_name][0][0]
self._parent, rank, self.division_id, self.specified_species = self._get_db("taxa")[str(self.tax_id)][0]
self._rank = Rank(rank)
self._str_attr_cache = {}
def _get_str_attr(self, attr_name):
if attr_name not in self._str_attr_cache:
pos_db = self._get_db(attr_name + "_pos")
str_db = self._get_db(attr_name)
try:
pos = pos_db[str(self.tax_id)][0][0]
except KeyError:
raise NoValue(f'The taxon {self} has no value indexed for "{attr_name}"')
self._str_attr_cache[attr_name] = str_db[pos:str_db.index(b"\n", pos)].decode()
return self._str_attr_cache[attr_name]
@property
def rank(self) -> Rank:
"""
Rank of the taxon. See https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7408187/#sec9title for more details.
"""
return self._rank
@property
def scientific_name(self) -> str:
"""
The unique scientific name of the taxon.
"""
return self._get_str_attr("scientific_name")
@property
def common_name(self) -> str:
'''
Common name of the taxon. In taxoniq, this is defined as the NCBI taxonomy blast name if available, or the
genbank common name if available, or the first listed common name. See
https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3245000/ for definitions of these fields.
'''
return self._get_str_attr("common_name")
@property
def lineage(self) -> 'List[Taxon]':
"""
Lineage for this taxon (the list of parent nodes from the taxon to the root of the taxonomic tree).
"""
lineage = [self]
while lineage[-1].tax_id != 1:
lineage.append(Taxon(lineage[-1]._parent))
return lineage
@property
def ranked_lineage(self) -> 'List[Taxon]':
"""
Lineage of main taxonomic ranks (species, genus, family, order, class, phylum, kingdom, superkingdom).
"""
return list(filter(lambda t: t.rank in self.common_ranks, self.lineage))
@property
def parent(self) -> 'Union[Taxon, None]':
"""
The parent taxon for this taxon.
For the root of the tree, `parent` is `None`.
"""
if self.tax_id == 0:
return None
else:
return Taxon(self._parent)
@property
def child_nodes(self) -> 'List[Taxon]':
"""
Returns a list of taxon objects that list this taxon as their parent.
"""
return [Taxon(int(t)) for t in self._get_str_attr("child_nodes").split(",")]
@property
def ranked_child_nodes(self) -> 'List[Taxon]':
'''
List of child nodes in the next main taxonomic rank (species, genus, family, order, class, phylum, kingdom,
superkingdom).
'''
return list(filter(lambda t: t.rank in self.common_ranks, self.child_nodes))
@property
def description(self) -> str:
'''
Introductory paragraph for this taxon from English Wikipedia, if available.
'''
try:
return self._get_str_attr("description")
except KeyError:
return ""
@property
def best_available_description(self):
'''
Introductory paragraph from English Wikipedia for this taxon or the first parent taxon where a description is
available.
'''
t = self
while t.tax_id != 1:
if t.description:
return t.description
t = t.parent
return ""
@property
def best_refseq_taxon(self):
'''
Best related taxon with refseq representative genome sequence available.
For viruses, this is the RefSeq "genome neighbor" as defined in
https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4383986/ and retrieved from
https://ftp.ncbi.nlm.nih.gov/genomes/Viruses/Viruses_RefSeq_and_neighbors_genome_data.tab.
For other domains, this is the RefSeq representative genome for the taxon's species, if available, as
seen in the species_taxid column of https://ftp.ncbi.nlm.nih.gov/genomes/refseq/assembly_summary_refseq.txt.
The accessions for the genome can be accessed as follows:
Taxon(123).best_refseq_taxon.refseq_representative_genome_accessions
'''
raise NotImplementedError()
@property
def host(self) -> List[str]:
'''
A text description of a symbiont host or hosts for this taxon's organism, if any.
'''
return self._get_str_attr("host").split(",")
@property
def refseq_representative_genome_accessions(self) -> List[Accession]:
"""
A list of :class:`Accession` objects for sequences in the RefSeq representative genome assembly for this
taxon, if available.
"""
return [Accession(i) for i in self._get_str_attr("taxid2refrep").split(",")]
@property
def refseq_genome_accessions(self) -> List[Accession]:
"""
A list of :class:`Accession` objects for sequences in the most recent RefSeq genome assembly for this
taxon, if available.
"""
return [Accession(i) for i in self._get_str_attr("taxid2refseq").split(",")]
[docs] @classmethod
def lca(cls, taxa):
"""
Given a list of Taxon objects, returns the last common ancestor taxon.
"""
raise NotImplementedError()
[docs] @classmethod
def distance(cls, taxa):
'''
Phylogenetic distance between taxa as computed by WoL
'''
raise NotImplementedError()
[docs] def closest_taxon_with_refseq_genome(self):
'''
Returns a taxon closest by phylogenetic distance as computed by WoL and with a refseq genome associated
'''
pass
@property
def url(self):
"""
Returns the HTTPS URL for the NCBI Taxonomy web page representing this taxon.
"""
return f"https://www.ncbi.nlm.nih.gov/Taxonomy/Browser/wwwtax.cgi?mode=Info&id={self.tax_id}"
@property
def wikidata_id(self):
"""
Wikidata ID for this taxon.
"""
wikidata_id = self._get_db("wikidata")[str(self.tax_id)][0][0]
return f"Q{wikidata_id}"
@property
def wikidata_url(self):
'''
URL of the Wikidata web page representing this taxon.
'''
if self.wikidata_id:
return f"https://www.wikidata.org/wiki/{self.wikidata_id}"
def __eq__(self, other):
return self.tax_id == other.tax_id
def __repr__(self):
return "{}.{}({})".format(self.__module__, self.__class__.__name__, self.tax_id)