"""Abstract Base Class for all Sessions."""
from __future__ import annotations
import itertools
import logging
from datetime import datetime
from functools import lru_cache, wraps
from inspect import isclass
from typing import (
TYPE_CHECKING,
Callable,
Dict,
FrozenSet,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)
from rdflib import OWL, RDF, RDFS, SKOS, BNode, Graph, Literal, URIRef
from rdflib.graph import ModificationException, ReadOnlyGraphAggregate
from rdflib.plugins.sparql.processor import SPARQLResult
from rdflib.query import ResultRow
from rdflib.term import Identifier, Node, Variable
from simphony_osp.ontology.annotation import OntologyAnnotation
from simphony_osp.ontology.attribute import OntologyAttribute
from simphony_osp.ontology.entity import OntologyEntity
from simphony_osp.ontology.individual import (
MultipleResultsError,
OntologyIndividual,
ResultEmptyError,
)
from simphony_osp.ontology.namespace import OntologyNamespace
from simphony_osp.ontology.oclass import OntologyClass
from simphony_osp.ontology.parser import OntologyParser
from simphony_osp.ontology.relationship import OntologyRelationship
from simphony_osp.ontology.utils import DataStructureSet, compatible_classes
from simphony_osp.utils import simphony_namespace
from simphony_osp.utils.cache import lru_cache_weak
from simphony_osp.utils.datatypes import (
UID,
AnnotationValue,
AttributeValue,
RelationshipValue,
Triple,
)
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from simphony_osp.interfaces.interface import Interface, InterfaceDriver
ENTITY = TypeVar("ENTITY", bound=OntologyEntity)
RDF_type = RDF.type
OWL_inverseOf = OWL.inverseOf
class Environment:
"""Environment where ontology entities may be created.
E.g. sessions, containers.
"""
# ↓ --------------------- Public API --------------------- ↓ #
@property
def locked(self) -> bool:
"""Whether the environment is locked or not.
A locked environment will not be closed when using it as a context
manager and leaving the context. Useful for setting it as the
default environment when it is not intended to close it afterwards.
"""
return (self._lock + bool(self._user_lock)) > 0
@locked.setter
def locked(self, value: bool):
"""Lock or unlock an environment."""
self._user_lock = value
def __enter__(self):
"""Set this as the default environment."""
self._stack_default_environment.append(self)
self._environment_references.add(self)
return self
def __exit__(self, *args):
"""Set the default environment back to the previous default."""
if self is not self._stack_default_environment[-1]:
raise RuntimeError(
"Trying to exit the an environment context "
"manager which was not the last entered one."
)
self._stack_default_environment.pop()
if (
self not in self._stack_default_environment
and not self.subscribers
and not self.locked
):
self.close()
return False
def close(self):
"""Close this environment."""
for environment in self.subscribers:
environment.close()
self.subscribers.remove(environment)
self._environment_references.remove(self)
def __bool__(self) -> bool:
"""Evaluate the truth value of the environment.
Such value is always true.
"""
return True
# ↑ --------------------- Public API --------------------- ↑ #
_session_linked: Optional[Session] = None
_stack_default_environment: List[Environment] = []
_environment_references: Set[Environment] = set()
_lock: int = 0
"""See the docstring of `locked` for an explanation of what locking an
environment means."""
_user_lock: bool = False
"""See the docstring of `locked` for an explanation of what locking an
environment means.
This property manages locks performed by the user (by setting the
`locked` attribute).
"""
def lock(self):
"""Increase the lock count.
This way of locking is not meant to be used by end users, only
internally within SimPhoNy code, as it allows to lock the
environment several times, later requiring several unlocks, which is
unintuitive.
See the docstring of `locked` for an explanation of what locking an
environment means.
"""
self._lock += 1
def unlock(self):
"""Decrease the lock count.
This way of unlocking is not meant to be used by end users, only
internally within SimPhoNy code, as it is the counterpart of
`lock`, which allows to lock the environment several times,
later requiring several unlocks, which is unintuitive.
See the docstring of `locked` for an explanation of what locking an
environment means.
"""
self._lock = self._lock - 1 if self._lock > 0 else 0
_subscribers: Set[Environment]
"""A private attribute is used in order not to interfere with the
`__getattr__`method from OntologyIndividual."""
def __init__(self, *args, **kwargs):
"""Initialize the environment with an empty set of subscribers."""
self._subscribers = set()
super().__init__(*args, **kwargs)
@property
def subscribers(self) -> Set[Environment]:
"""Environments that depend on this instance.
Such environments will be closed when this instance is closed.
"""
return self._subscribers
@subscribers.setter
def subscribers(self, value: Set[Environment]):
"""Setter for the private `_subscribers` attribute."""
self._subscribers = value
@classmethod
def get_default_environment(cls) -> Optional[Environment]:
"""Returns the default environment."""
for environment in cls._stack_default_environment[::-1]:
return environment
else:
return None
class SessionSet(DataStructureSet):
"""A set interface to a session.
This class looks like and acts like the standard `set`, but it is an
interface to the methods from `Session` that manage the addition and
removal of individuals.
"""
_session: Session
def __init__(
self,
session: Optional[Session] = None,
oclass: Optional[OntologyClass] = None,
uids: Optional[Iterable[UID]] = None,
):
"""Fix the linked session, the class and the identifier filter."""
if oclass is not None and not isinstance(oclass, OntologyClass):
raise TypeError(
"Found object of type %s passed to argument "
"oclass. Should be an OntologyClass." % type(oclass)
)
uids = tuple(uids) if uids is not None else None
if uids is not None:
for uid in uids:
if not isinstance(uid, UID):
raise TypeError(
"Found object of type %s. Should be an UID."
% type(uid)
)
self._class_filter = oclass
self._uid_filter = uids
self._session = session or Session.get_default_session()
super().__init__()
[docs] def __iter__(self):
"""The entities contained in the session."""
identifiers = self._uid_filter
class_ = self._class_filter
if identifiers:
yielded = set()
for entity in self._iter_identifiers():
if entity not in yielded:
yield entity
elif class_:
yield from (
row[0]
for row in self._session.sparql(
f"""
SELECT DISTINCT ?entity WHERE {{
?entity rdf:type/rdfs:subClassOf*
<{class_.iri}> .
}}
""",
ontology=True,
)(entity=OntologyIndividual)
)
else:
yield from iter(self._session)
[docs] def __contains__(self, item: OntologyIndividual) -> bool:
"""Check whether an ontology entity belongs to the session."""
return item in self._session and (
item.is_a(self._class_filter) if self._class_filter else True
)
[docs] def update(self, other: Iterable[OntologyIndividual]) -> None:
"""Update the set with the union of itself and others."""
other = set(other)
if self._class_filter:
for individual in other:
if not individual.is_a(self._class_filter):
raise RuntimeError(
f"Cannot update {self} with {individual} because it "
f"does not belong to class {self._class_filter}."
)
self._session.add(
*other,
merge=True,
exists_ok=True,
)
[docs] def intersection_update(self, other: Iterable[OntologyIndividual]) -> None:
"""Update the set with the intersection of itself and another."""
intersection = set(
x
for x in other
if (x.identifier, RDF.type, None) in self._session.graph
)
if self._class_filter:
for individual in intersection:
if not individual.is_a(self._class_filter):
raise RuntimeError(
f"Cannot update {self} with {individual} because it "
f"does not belong to class {self._class_filter}."
)
existing = set(
x.identifier for x in self._session.get(oclass=self._class_filter)
)
remove = existing - set(x.identifier for x in intersection)
self._session.add(
*intersection,
merge=True,
exists_ok=True,
)
self._session.delete(remove)
[docs] def difference_update(self, other: Iterable[OntologyIndividual]) -> None:
"""Remove all elements of another set from this set."""
other = set(other)
exists = set()
for entity in other:
try:
exists.add(
self._session.from_identifier_typed(
entity.identifier, typing=OntologyIndividual
)
)
except KeyError:
pass
if self._class_filter:
for individual in exists:
if not individual.is_a(self._class_filter):
raise RuntimeError(
f"Cannot delete {individual} because it "
f"does not belong to class {self._class_filter}."
)
self._session.delete(exists)
[docs] def symmetric_difference_update(
self, other: Iterable[OntologyIndividual]
) -> None:
"""Update set with the symmetric difference of it and another."""
other = set(other)
intersection = set(
x
for x in other
if (x.identifier, RDF.type, None) in self._session.graph
)
add = other - intersection
delete = set(
x
for x in self
if x.identifier in (x.identifier for x in intersection)
)
if self._class_filter:
for individual in add:
if not individual.is_a(self._class_filter):
raise RuntimeError(
f"Cannot add {individual} because it "
f"does not belong to class {self._class_filter}."
)
for individual in delete:
if not individual.is_a(self._class_filter):
raise RuntimeError(
f"Cannot delete {individual} because it "
f"does not belong to class {self._class_filter}."
)
self._session.add(add, merge=False, exists_ok=False)
self._session.delete(delete)
def __repr__(self) -> str:
"""Return repr(self)."""
return (
set(self).__repr__()
+ " <"
+ (
f"class {self._class_filter} "
if self._class_filter is not None
else ""
)
+ f"of session {self._session.identifier or self._session}>"
)
[docs] def one(
self,
) -> OntologyIndividual:
"""Return one element.
Return one element if the set contains one element, else raise
an exception.
Returns:
The only element contained in the set.
Raises:
ResultEmptyError: No elements in the set.
MultipleResultsError: More than one element in the set.
"""
iter_self = iter(self)
first_element = next(iter_self, StopIteration)
if first_element is StopIteration:
raise ResultEmptyError("No elements to be yielded.")
second_element = next(iter_self, StopIteration)
if second_element is not StopIteration:
raise MultipleResultsError("More than one element can be yielded.")
return first_element
[docs] def any(
self,
) -> Optional[Union[AnnotationValue, AttributeValue, RelationshipValue]]:
"""Return any element of the set.
Returns:
Any element from the set if the set is not empty, else None.
"""
return next(iter(self), None)
[docs] def all(self) -> SessionSet:
"""Return all elements from the set.
Returns:
All elements from the set, namely the set itself.
"""
return self
def _iter_identifiers(self) -> Iterator[Optional[OntologyIndividual]]:
identifiers = self._uid_filter
class_ = self._class_filter
for i, identifier in identifiers:
try:
entity = self._session.from_identifier_typed(
identifier, OntologyIndividual
)
except KeyError:
entity = None
if entity and class_ and not entity.is_a(class_):
entity = None
yield entity
class Session(Environment):
"""'Box' that stores ontology individuals."""
# ↓ --------------------- Public API --------------------- ↓ #
"""These methods are meant to be available to the end-user."""
identifier: Optional[str] = None
"""A label for the session.
The identifier is just a label for the session to be displayed within
Python (string representation of the session). It has no other effect.
"""
[docs] def commit(self) -> None:
"""Commit pending changes to the session's graph."""
self._graph.commit()
# if self.ontology is not self:
# self.ontology.commit()
self.creation_set = set()
[docs] def compute(self, **kwargs) -> None:
"""Run simulations on supported graph stores."""
from simphony_osp.interfaces.remote.client import RemoteStoreClient
if self._driver is not None:
self.commit()
self._driver.compute(**kwargs)
elif isinstance(self._graph.store, RemoteStoreClient):
self._graph.store.execute_method("compute")
else:
raise AttributeError(
f"Session {self} is not attached to a "
f"simulation engine. Thus, the attribute "
f"`compute` is not available."
)
[docs] def close(self) -> None:
"""Close the connection to the session's backend.
Sessions are an interface to a graph linked to an RDFLib store (a
backend). If the session will not be used anymore, then it makes
sense to close the connection to such backend to free resources.
"""
if self in self._stack_default_environment:
raise RuntimeError(
"Cannot close a session that is currently "
"being used as a context manager."
)
super().close()
self.graph.close(commit_pending_transaction=False)
[docs] def sparql(self, query: str, ontology: bool = False) -> QueryResult:
"""Perform a SPARQL CONSTRUCT, DESCRIBE, SELECT or ASK query.
By default, the query is performed only on the session's data (the
ontology is not included).
Args:
query: String to use as query.
ontology: Whether to include the ontology in the query or not.
When the ontology is included, only read-only queries are
possible.
"""
graph = (
self.graph
if not ontology
else ReadOnlyGraphAggregate([self.graph, self.ontology.graph])
)
result = graph.query(query)
return QueryResult(
{
"type_": result.type,
"vars_": result.vars,
"bindings": result.bindings,
"askAnswer": result.askAnswer,
"graph": result.graph,
},
session=self,
)
[docs] def __enter__(self):
"""Sets the session as the default session."""
super().__enter__()
self.creation_set = set()
return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
"""Restores the previous default session."""
return super().__exit__(exc_type, exc_val, exc_tb)
[docs] def __contains__(self, item: OntologyEntity):
"""Check whether an ontology entity is stored on the session."""
return item.session is self
[docs] def __iter__(self) -> Iterator[OntologyEntity]:
"""Iterate over all the ontology entities in the session.
Be careful when using this operation, as it can be computationally very
expensive.
"""
# Warning: entities can be repeated.
return (
self.from_identifier(identifier)
for identifier in self.iter_identifiers()
)
[docs] def __len__(self) -> int:
"""Return the number of ontology entities within the session."""
return sum(1 for _ in self)
[docs] @lru_cache_weak(maxsize=4096)
# On `__init__.py` there is an option to bypass this cache when the
# session is not a T-Box.
def from_label(
self,
label: str,
lang: Optional[str] = None,
case_sensitive: bool = False,
) -> FrozenSet[OntologyEntity]:
"""Get an ontology entity from its label.
Args:
label: The label of the ontology entity.
lang: The language of the label.
case_sensitive: when false, look for similar labels with
different capitalization.
Raises:
KeyError: Unknown label.
Returns:
The ontology entity.
"""
results = set()
identifiers_and_labels = self.iter_labels(
lang=lang,
return_prop=False,
return_literal=False,
return_identifier=True,
)
if case_sensitive is False:
comp_label = label.lower()
identifiers_and_labels = (
(label.lower(), identifier)
for label, identifier in identifiers_and_labels
)
else:
comp_label = label
identifiers_and_labels = (
(label, identifier)
for label, identifier in identifiers_and_labels
if label == comp_label
)
for _, identifier in identifiers_and_labels:
try:
results.add(self.from_identifier(identifier))
except KeyError:
pass
if len(results) == 0:
error = "No element with label %s was found in ontology %s." % (
label,
self,
)
raise KeyError(error)
return frozenset(results)
[docs] def add(
self,
*individuals: Union[OntologyIndividual, Iterable[OntologyIndividual]],
merge: bool = False,
exists_ok: bool = False,
all_triples: bool = False,
) -> Union[OntologyIndividual, FrozenSet[OntologyIndividual]]:
"""Copies ontology individuals to the session.
Args:
individuals: Ontology individuals to add to this session.
merge: Whether to merge individuals with existing ones if their
identifiers match (read the SimPhoNy documentation for more
details).
exists_ok: Merge or overwrite individuals when they already exist
in the session rather than raising an exception.
all_triples: When an individual is added to the session, SimPhoNy
only copies the details that are relevant from an ontological
point of view: the individual's attributes, the classes it
belongs to, and its connections to other ontology individuals
that are also being copied at the same time.
However, in some cases, it is necessary to keep all the
information about the individual, even if it cannot be
understood by SimPhoNy. Set this option to `True` to copy all
RDF statements describing the individual, that is, all RDF
statements where the individual is the subject.
One example of a situation where this option is useful is
when the individual is attached through an object property to
another one which is not properly defined (i.e. has no type
assigned). This situation commonly arises when using the
`dcat:accessURL` object property.
Returns:
The new copies of the individuals.
Raises:
RuntimeError: The individual being added has an identifier that
matches the identifier of an individual that already exists in the
session.
"""
# Unpack iterables
individuals = list(
individual
for x in individuals
for individual in (
x if not isinstance(x, OntologyIndividual) else (x,)
)
)
# Get the identifiers of the individuals
identifiers = list(individual.identifier for individual in individuals)
# Get a list of files within the individuals to add
files = {
individual
for individual in individuals
if set(class_.identifier for class_ in individual.superclasses)
& {simphony_namespace.File}
and individual.session is not self
}
# Paste the individuals
"""The attributes of the individuals are always kept. The
relationships between the individuals are only kept when they are
pasted together.
"""
if (
any(
(identifier, None, None) in self.graph
for identifier in identifiers
)
and exists_ok is False
):
raise RuntimeError(
"Some of the added entities already exist on the session."
)
elif (
merge
and files
and any(
(identifier, None, None) in self.graph
for identifier in {x.identifier for x in files}
)
):
raise RuntimeError(
"Some of the added file entities already exist on the "
"session. File entities cannot be merged with existing ones."
)
delete = (
(individual.identifier, None, None)
for individual in individuals
if individual.session is not self
)
@lru_cache(maxsize=4096)
def is_known(
p: Node,
) -> Optional[
Union[OntologyAttribute, OntologyRelationship, OntologyAnnotation]
]:
"""Check whether a predicate is known in the session's ontology.
Args:
p: Predicate to be evaluated.
Returns:
The predicate if it is known, `None` if it is not.
"""
try:
entity = self.ontology.from_identifier(p)
if not isinstance(
entity,
(
OntologyRelationship,
OntologyAttribute,
OntologyAnnotation,
),
):
entity = None
except KeyError:
entity = None
return entity
def is_valid(
s: Node, p: Node, o: Node, exception: bool = False
) -> bool:
"""Check whether a predicate is known and has a valid target.
Check whether the predicate is a known relationship, attribute
or annotation in this session's ontology, and points to a
"valid" target:
- Attributes must point to literals.
- Relationships must point to individuals being copied
simultaneously into the session.
- Annotations can point to anything.
The word "valid" is written with quotation marks because it is
arguably just a superset of what is really valid (e.g. it is
not checked that the data type of literals match the range of
the attributes).
Args:
s: Subject of the statement.
p: Predicate to be evaluated.
o: Target of the predicate.
exception: When a value is given, an exception instead of a
warning is emitted if an "invalid" target is identified.
The value is used to identify the ontology individual is
related to.
Returns:
The predicate points to a "valid" target.
"""
predicate = is_known(p)
if isinstance(predicate, OntologyAttribute):
result = isinstance(o, Literal)
elif isinstance(predicate, OntologyRelationship):
result = o in identifiers
elif isinstance(predicate, OntologyAnnotation):
result = True
else: # isinstance(predicate, type(None)):
result = False
if not result:
if not predicate:
text = (
f"Individual {s} is the subject of a statement "
f"that has {p} as predicate, which does not match any "
f"annotation, relationship or attribute from the "
f"installed ontologies."
)
elif isinstance(predicate, OntologyAttribute):
text = (
f"Individual {s} is the subject of a RDF "
f"statement that has {predicate} as predicate. "
f"{predicate} is an ontology attribute, but "
f"the object of the statement "
f"is not a literal."
)
else:
text = ""
if text:
if exception:
raise RuntimeError(
text + " Set the keyword argument `all_triples`"
"to `True` to ignore this error."
)
else:
logger.warning(
"Accepting uninterpretable RDF statement: " + text
)
return result
add = (
(s, p, o)
for individual in individuals
for s, p, o in individual.session.graph.triples(
(individual.identifier, None, None)
)
if (
p == RDF.type
or is_valid(s, p, o, exception=not all_triples)
or all_triples
)
)
if not merge:
"""Replace previous individuals if merge is False."""
for pattern in delete:
self.graph.remove(pattern)
self.graph.addN((s, p, o, self.graph) for s, p, o in add)
files = ((file.identifier, file.operations.handle) for file in files)
for identifier, contents in files:
self.from_identifier_typed(
identifier, typing=OntologyIndividual
).operations.overwrite(contents)
added_objects = list(
self.from_identifier_typed(identifier, typing=OntologyIndividual)
for identifier in identifiers
)
return (
next(iter(added_objects), None)
if len(added_objects) <= 1
else added_objects
)
[docs] def delete(
self,
*entities: Union[
Union[OntologyEntity, Identifier],
Iterable[Union[OntologyEntity, Identifier]],
],
):
"""Remove ontology individuals from the session.
Args:
entities: Ontology individuals to remove from the session. It is
also possible to just provide their identifiers.
Raises:
ValueError: When at least one of the given ontology individuals is
not contained in the session.
"""
entities = frozenset(
entity
for x in entities
for entity in (
x if not isinstance(x, (OntologyEntity, Identifier)) else (x,)
)
)
for entity in entities:
if isinstance(entity, OntologyEntity) and entity not in self:
raise ValueError(f"Entity {entity} not contained in {self}.")
for entity in entities:
if isinstance(entity, OntologyEntity):
entity = entity.identifier
self._track_identifiers(entity, delete=True)
self._graph.remove((entity, None, None))
self._graph.remove((None, None, entity))
[docs] def clear(self, force: bool = False):
"""Clear all the data stored in the session.
Args:
force: Try to clear read-only sessions too.
"""
graph = self._graph_writable if force else self._graph
graph.remove((None, None, None))
self._namespaces.clear()
self.entity_cache_timestamp = datetime.now()
self.from_identifier.cache_clear()
self.from_label.cache_clear()
# Reload the essential TBox required by ontologies.
if self.ontology is self:
for parser in (
OntologyParser.get_parser("simphony"),
OntologyParser.get_parser("owl"),
OntologyParser.get_parser("rdfs"),
):
self.ontology.load_parser(parser)
[docs] def get(
self,
*individuals: Union[OntologyIndividual, Identifier, str],
oclass: Optional[OntologyClass] = None,
) -> Union[
Set[OntologyIndividual],
Optional[OntologyIndividual],
Tuple[Optional[OntologyIndividual]],
]:
"""Return the individuals in the session.
The structure of the output can vary depending on the form used for
the call. See the "Returns:" section of this
docstring for more details on this.
Note: If you are reading the SimPhoNy documentation API Reference, it
is likely that you cannot read this docstring. As a workaround, click
the `source` button to read it in its raw form.
Args:
individuals: Restrict the individuals to be returned to a certain
subset of the individuals in the session.
oclass: Only yield ontology individuals which belong to a subclass
of the given ontology class. Defaults to None (no filter).
Returns:
Calls without `*individuals` (SessionSet): The result of the
call is a set-like object. This corresponds to
the calls `get()`, `get(oclass=___)`.
Calls with `*individuals` (Optional[OntologyIndividual],
Tuple[Optional["OntologyIndividual"], ...]):
The position of each element in the result is determined by the
position of the corresponding identifier/individual in the
given list of identifiers/individuals. In this case, the result
can contain `None` values if a given identifier/individual is
not in the session, or if it does not satisfy the class
filter. This description corresponds to the calls
`get(*individuals)`, `get(*individuals, oclass=`___`)`.
Raises:
TypeError: Objects that are not ontology individuals,
identifiers or strings provided as positional arguments.
TypeError: Object that is not an ontology class passed as
keyword argument `oclass`.
RuntimeError: Ontology individuals that belong to a different
session provided.
"""
identifiers = list(individuals)
for i, x in enumerate(identifiers):
if not isinstance(x, (OntologyIndividual, Identifier, str)):
raise TypeError(
f"Expected {OntologyIndividual}, {Identifier} or {str} "
f"objects, not {type(x)}."
)
elif isinstance(x, OntologyIndividual) and x not in self:
raise RuntimeError(
"Cannot get an individual that belongs to "
"a different session."
)
if isinstance(x, str):
if not isinstance(x, Identifier):
identifiers[i] = URIRef(x)
elif isinstance(x, OntologyIndividual):
identifiers[i] = x.identifier
if identifiers:
entities = [None] * len(identifiers)
for i, identifier in enumerate(identifiers):
try:
entity = self.from_identifier(identifier)
except KeyError:
entity = None
if entity and oclass and not entity.is_a(oclass):
entity = None
entities[i] = entity
if len(identifiers) == 1:
entities = entities[0]
else:
entities = tuple(entities)
else:
entities = SessionSet(session=self, oclass=oclass)
return entities
[docs] def iter(
self,
*individuals: Union[OntologyIndividual, Identifier, str],
oclass: Optional[OntologyClass] = None,
) -> Union[
Iterator[OntologyIndividual],
Iterator[Optional[OntologyIndividual]],
]:
"""Iterate over the ontology individuals in the session.
The structure of the output can vary depending on the form used for
the call. See the "Returns:" section of this docstring for more
details on this.
Note: If you are reading the SimPhoNy documentation API Reference, it
is likely that you cannot read this docstring. As a workaround, click
the `source` button to read it in its raw form.
Args:
individuals: Restrict the individuals to be returned to a certain
subset of the individuals in the session.
oclass: Only yield ontology individuals which belong to a subclass
of the given ontology class. Defaults to None (no filter).
Returns:
Calls without `*individuals` (Iterator[OntologyIndividual]): The
position of each element in the result is non-deterministic.
This corresponds to the calls `iter()`, `iter(oclass=___)`.
Calls with `*individuals` (Iterator[Optional[
OntologyIndividual]]):
The position of each element in the result is determined by the
position of the corresponding identifier/individual in the
given list of identifiers/individuals. In this case, the result
can contain `None` values if a given identifier/individual is
not in the session, or if it does not satisfy the class
filter. This description corresponds to the calls
`iter(*individuals)`, `iter(*individuals, oclass=`___`)`.
Raises:
TypeError: Objects that are not ontology individuals,
identifiers or strings provided as positional arguments.
TypeError: Object that is not an ontology class passed as
keyword argument `oclass`.
RuntimeError: Ontology individuals that belong to a different
session provided.
"""
identifiers = list(individuals)
for i, x in enumerate(identifiers):
if not isinstance(x, (OntologyIndividual, Identifier, str)):
raise TypeError(
f"Expected {OntologyIndividual}, {Identifier} or {str} "
f"objects, not {type(x)}."
)
elif isinstance(x, OntologyIndividual) and x not in self:
raise RuntimeError(
"Cannot get an individual that belongs to "
"a different session."
)
if isinstance(x, str):
if not isinstance(x, Identifier):
identifiers[i] = URIRef(x)
elif isinstance(x, OntologyIndividual):
identifiers[i] = x.identifier
if oclass is not None and not isinstance(oclass, OntologyClass):
raise TypeError(
"Found object of type %s passed to argument "
"oclass. Should be an OntologyClass." % type(oclass)
)
if identifiers:
# The yield statement is encapsulated inside a function so that the
# main function uses the return statement instead of yield. In this
# way, exceptions are checked when the `iter` method is called
# instead of when asking for the first result.
def iterator() -> Iterator[Optional[OntologyIndividual]]:
for identifier in identifiers:
try:
entity = self.from_identifier(identifier)
except KeyError:
entity = None
if entity and oclass and not entity.is_a(oclass):
entity = None
yield entity
return iterator()
else:
return iter(SessionSet(session=self, oclass=oclass))
# ↑ --------------------- Public API --------------------- ↑ #
default_ontology: Session
"""The default ontology.
When no T-Box is explicitly assigned to a session, this is the ontology
it makes use of.
"""
entity_cache_timestamp: Optional[datetime] = None
"""A timestamp marking the time when the session's graph was last modified.
This timestamp is used by `OntologyEntity` and its subclasses to know
whether they should invalidate their cache (e.g. the cache of the
`superclasses` method must be invalidated when the session is cleared or a
new ontology is loaded into the session).
"""
@property
def ontology(self) -> Session:
"""Another session considered to be the T-Box of this one.
In a normal setting, a session is considered only to contain an A-Box.
When it is necessary to look for a class, a relationship, an attribute
or an annotation property, the session will look there for their
definition.
"""
return self._ontology or Session.default_ontology
@ontology.setter
def ontology(self, value: Optional[Session]) -> None:
"""Set the T-Box of this session."""
if not isinstance(value, (Session, type(None))):
raise TypeError(
f"Expected {Session} or {type(None)}, not type {value}."
)
self._ontology = value
_ontology: Optional[Session] = None
"""Private pointer to the T-Box of the session.
Not `None` only when the T-Box of the session should be different from
the default T-Box (the one referred to by the attribute `default_ontology`,
which is by default a session containing all the installed ontologies).
"""
label_predicates: Tuple[URIRef] = (SKOS.prefLabel, RDFS.label)
"""The identifiers of the RDF predicates to be considered as labels.
The entity labels are used, for example, to be able to get ontology
entities from namespace or session objects by such label.
The order in which the properties are specified in the tuple matters. To
determine the label of an object, the properties will be checked from
left to right, until one of them is defined for that specific entity.
This will be the label of such ontology entity. The rest of the
properties to the right of such property will be ignored for that
entity.
For example, in the default case above, if an entity has an
`SKOS.prefLabel` it will be considered to be its label, even if it also
has an `RDFS.label`, which will be ignored. If another entity has no
`SKOS.prefLabel` but has a `RDFS.label`, then the `RDFS.label` will
define its label. This means that for some entity, one label property
may be used while for another, a different property can be in use. If
none of the properties are defined, then the entity is considered to
have no label.
"""
label_languages: Tuple[URIRef] = ("en",)
# TODO: Set to user's language preference from the OS (users can usually
# set such a list in modern operating systems).
"""The preferred languages for the default label.
Normally, entities will be available from all languages. However,
in some places the label has to be printed. In such cases this default
label will be used.
When defining the label for an object as described in the
`label_predicates` docstring above, this list will also be checked from
left to right. When one of the languages specified is available,
this will define the default label. Then the default label will default to
english. If also not available, then any language will be used.
"""
[docs] def __init__(
self,
base: Optional[Graph] = None, # The graph must be OPEN already.
driver: Optional[InterfaceDriver] = None,
ontology: Optional[Union[Session, bool]] = None,
identifier: Optional[str] = None,
namespaces: Dict[str, URIRef] = None,
from_parser: Optional[OntologyParser] = None,
):
"""Initializes the session.
The keyword arguments are used internally by SimPhoNy and are not meant
to be set manually.
"""
super().__init__()
self._environment_references.add(self)
# Base the session graph either on a store if passed or an empty graph.
if base is not None:
self._graph_writable = base
self._graph = base
else:
graph = Graph()
self._graph_writable = graph
self._graph = graph
self._interface_driver = driver
# Configure the ontology for this session
if isinstance(ontology, Session):
self.ontology = ontology
elif ontology is True:
self._graph = ReadOnlyGraphAggregate([self._graph_writable])
self.ontology = self
elif ontology is not None:
raise TypeError(
f"Invalid ontology argument: {ontology}."
f"Expected either a {Session} or {bool} object, "
f"got {type(ontology)} instead."
)
if self.ontology is not self:
"""Bypass cache if this session is not a T-Box"""
def bypass_cache(method: Callable):
wrapped_func = method.__wrapped__
@wraps(wrapped_func)
def bypassed(*args, **kwargs):
return wrapped_func(self, *args, **kwargs)
bypassed.cache_clear = lambda: None
return bypassed
self.from_identifier = bypass_cache(self.from_identifier)
self.from_label = bypass_cache(self.from_label)
else:
"""Log the time of last entity cache clearing."""
self.entity_cache_timestamp = datetime.now()
self._entity_cache = dict()
self.creation_set = set()
self._storing = list()
self._namespaces = dict()
# Load the essential TBox required by ontologies.
if self.ontology is self:
for parser in (
OntologyParser.get_parser("simphony"),
OntologyParser.get_parser("owl"),
OntologyParser.get_parser("rdfs"),
):
self.ontology.load_parser(parser)
if from_parser: # Compute session graph from an ontology parser.
if self.ontology is not self:
raise RuntimeError(
"Cannot load parsers in sessions which "
"are not their own ontology. Load the "
"parser on the ontology instead."
)
if namespaces is not None:
logger.warning(
f"Namespaces bindings {namespaces} ignored, "
f"as the session {self} is being created from "
f"a parser."
)
self.load_parser(from_parser)
self.identifier = identifier or from_parser.identifier
else: # Create an empty session.
self.identifier = identifier
namespaces = namespaces if namespaces is not None else dict()
for key, value in namespaces.items():
self.bind(key, value)
def __str__(self):
"""Convert the session to a string."""
# TODO: Return the kind of RDFLib store attached too.
return (
f"<{self.__class__.__module__}.{self.__class__.__name__}: "
f"{self.identifier if self.identifier is not None else ''} "
f"at {hex(id(self))}>"
)
@lru_cache_weak(maxsize=4096)
# On `__init__.py` there is an option to bypass this cache when the
# session is not a T-Box.
def from_identifier(self, identifier: Node) -> OntologyEntity:
"""Get an ontology entity from its identifier.
Args:
identifier: The identifier of the entity.
Raises:
KeyError: The ontology entity is not stored in this session.
Returns:
The OntologyEntity.
"""
# WARNING: This method is a central point in SimPhoNy. Change with
# care.
# TIP: Since the method is a central point in SimPhoNy, any
# optimization it gets will speed up SimPhoNy, while bad code in
# this method will slow it down.
# Look for embedded classes.
compatible = {
rdf_type: compatible_classes(rdf_type, identifier)
for rdf_type in self._graph.objects(identifier, RDF_type)
}
# If not an embedded class, then the type may be known in
# the ontology. This means that an ontology individual would
# have to be spawned.
for rdf_type, found in compatible.items():
if not found:
try:
self.ontology.from_identifier(rdf_type)
found |= {OntologyIndividual}
break
except KeyError:
pass
compatible = set().union(*compatible.values())
if (
OntologyRelationship not in compatible
and (identifier, OWL_inverseOf, None) in self._graph
):
compatible |= {OntologyRelationship}
"""Some ontologies are hybrid RDFS and OWL ontologies (i.e. FOAF).
In such cases, object and datatype properties are preferred to
annotation properties."""
if OntologyAnnotation in compatible and (
compatible & {OntologyRelationship, OntologyAttribute}
):
compatible.remove(OntologyAnnotation)
"""Finally return the single compatible class or raise an exception."""
if len(compatible) >= 2:
raise RuntimeError(
f"Two or more python classes ("
f"{', '.join(map(str, compatible))}) "
f"could be spawned from {identifier}."
)
try:
python_class = compatible.pop()
return python_class(uid=UID(identifier), session=self, merge=None)
except KeyError:
raise KeyError(
f"Identifier {identifier} does not match any OWL "
f"entity, any entity natively supported by "
f"SimPhoNy, nor an ontology individual "
f"belonging to a class in the ontology."
)
def from_identifier_typed(
self, identifier: Node, typing: Type[ENTITY]
) -> ENTITY:
"""Get an ontology entity from its identifier, enforcing a type check.
Args:
identifier: The identifier of the entity.
typing: The expected type of the ontology entity matching the
given identifier.
Raises:
KeyError: The ontology entity is not stored in this session.
Returns:
The OntologyEntity.
"""
entity = self.from_identifier(identifier)
if not isinstance(entity, typing):
raise TypeError(f"{identifier} is not of class {typing}.")
return entity
def merge(self, entity: OntologyEntity) -> None:
"""Merge a given ontology entity with what is in the session.
Copies the ontology entity to the session, but does not remove any
old triples referring to the entity.
Args:
entity: The ontology entity to store.
"""
self._update_and_merge_helper(entity, mode=False)
def update(self, entity: OntologyEntity) -> None:
"""Store a copy of given ontology entity in the session.
Args:
entity: The ontology entity to store.
"""
self._update_and_merge_helper(entity, mode=True)
@property
def namespaces(self) -> List[OntologyNamespace]:
"""Get all the namespaces bound to the session."""
return [
OntologyNamespace(iri=iri, name=name, ontology=self.ontology)
for iri, name in self.ontology._namespaces.items()
]
def bind(self, name: Optional[str], iri: Union[str, URIRef]):
"""Bind a namespace to this session.
Args:
name: the name to bind. The name is optional, a namespace object
can be bound without name.
iri: the IRI of the namespace to be bound to such name.
"""
iri = URIRef(iri)
for key, value in self.ontology._namespaces.items():
if value == name and key != iri:
raise ValueError(
f"Namespace {key} is already bound to name "
f"{name} in ontology {self}. "
f"Please unbind it first."
)
else:
self.ontology._namespaces[iri] = name
self.ontology._graph_writable.bind(name, iri)
def unbind(self, name: Union[str, URIRef]):
"""Unbind a namespace from this session.
Args:
name: the name to which the namespace is already bound, or the
IRI of the namespace.
"""
for key, value in dict(self.ontology._namespaces).items():
if value == name or key == URIRef(name):
del self.ontology._namespaces[key]
def get_namespace_bind(
self, namespace: Union[OntologyNamespace, URIRef, str]
) -> Optional[str]:
"""Returns the name used to bind a namespace to the ontology.
Args:
namespace: Either an OntologyNamespace or the IRI of a namespace.
Raises:
KeyError: Namespace not bound to to the ontology.
"""
ontology = self.ontology
if isinstance(namespace, OntologyNamespace):
ontology = namespace.ontology
namespace = namespace.iri
else:
namespace = URIRef(namespace)
not_bound_error = KeyError(
f"Namespace {namespace} not bound to " f"ontology {self}."
)
if ontology is not self.ontology:
raise not_bound_error
try:
return self.ontology._namespaces[namespace]
except KeyError:
raise not_bound_error
def get_namespace(self, name: Union[str, URIRef]) -> OntologyNamespace:
"""Get a namespace registered with the session.
Args:
name: The namespace name or IRI to search for.
Returns:
The ontology namespace.
Raises:
KeyError: Namespace not found.
"""
coincidences = iter(tuple())
if isinstance(name, URIRef):
coincidences_iri = (x for x in self.namespaces if x.iri == name)
coincidences = itertools.chain(coincidences, coincidences_iri)
elif isinstance(name, str):
coincidences_name = (x for x in self.namespaces if x.name == name)
coincidences = itertools.chain(coincidences, coincidences_name)
# Last resort: user provided string but may be an IRI.
coincidences_fallback = (
x for x in self.namespaces if x.iri == URIRef(name)
)
coincidences = itertools.chain(coincidences, coincidences_fallback)
result = next(coincidences, None)
if result is None:
raise KeyError(f"Namespace {name} not found in ontology {self}.")
return result
@property
def graph(self) -> Graph:
"""Returns the session's graph."""
return self._graph
@property
def driver(self) -> Optional[InterfaceDriver]:
"""The SimPhoNy interface on which the base graph is based on.
Points to the interface response for realizing the base graph of the
session. Not all graphs have to be based on an interface. In such
cases, the value of this attribute is `None`.
"""
return self._interface_driver
@classmethod
def get_default_session(cls) -> Optional[Session]:
"""Returns the default session."""
for environment in cls._stack_default_environment[::-1]:
if isinstance(environment, Session):
return environment
else:
return None
@classmethod
def set_default_session(cls, session: Session):
"""Sets the first session of the stack of sessions.
This effectively makes it the default. The method will not work if
there are any other default environments in the stack
"""
if len(cls._stack_default_environment) > 1:
raise RuntimeError(
"The default session cannot be changed when "
"there are other environments in the stack."
)
try:
cls._stack_default_environment.pop()
except IndexError:
pass
cls._stack_default_environment.append(session)
def load_parser(self, parser: OntologyParser):
"""Merge ontology packages with this ontology from a parser object.
Args:
parser: the ontology parser from where to load the new namespaces.
"""
if self.ontology is not self:
raise ModificationException()
self._graph_writable += parser.graph
for name, iri in parser.namespaces.items():
self.bind(name, iri)
self.entity_cache_timestamp = datetime.now()
self.from_identifier.cache_clear()
self.from_label.cache_clear()
def iter_identifiers(self) -> Iterator[Union[BNode, URIRef]]:
"""Iterate over all the ontology entity identifiers in the session."""
# Warning: identifiers can be repeated.
supported_entity_types = frozenset(
{
# owl:AnnotationProperty
OWL.AnnotationProperty,
RDF.Property,
# owl:DatatypeProperty
OWL.DatatypeProperty,
# owl:ObjectProperty
OWL.ObjectProperty,
# owl:Class
OWL.Class,
RDFS.Class,
# owl:Restriction
OWL.Restriction,
}
)
# Yield the entities from the TBox (literals filtered out).
if self.ontology is self:
yield from (
s
for t in supported_entity_types
for s in self.ontology.graph.subjects(RDF.type, t)
if not isinstance(s, Literal)
)
# Yield the entities from the ABox (literals filtered out).
yield from (
t[0]
for t in self._graph.triples((None, RDF.type, None))
if not isinstance(t[0], Literal)
and any(
(t[2], RDF.type, supported_entity_type) in self.ontology.graph
for supported_entity_type in supported_entity_types
)
)
def iter_labels(
self,
entity: Optional[Union[Identifier, OntologyEntity]] = None,
lang: Optional[str] = None,
return_prop: bool = False,
return_literal: bool = True,
return_identifier: bool = False,
) -> Iterator[
Union[
Literal,
str,
Tuple[Union[Literal, str], Node],
Tuple[Union[Literal, str], Node, Node],
]
]:
"""Iterate over all the labels of the entities in the session."""
from simphony_osp.ontology.entity import OntologyEntity
if isinstance(entity, OntologyEntity):
entity = entity.identifier
def filter_language(literal):
if lang is None:
return True
elif lang == "":
return literal.language is None
else:
return literal.language == lang
labels = (
(prop, literal, subject)
for prop in self.label_predicates
for subject, _, literal in self._graph.triples(
(entity, prop, None)
)
)
labels = filter(
lambda label_tuple: filter_language(label_tuple[1]), labels
)
if not return_prop and not return_literal and not return_identifier:
return (str(x[1]) for x in labels)
elif return_prop and not return_literal and not return_identifier:
return ((str(x[1]), x[0]) for x in labels)
elif not return_prop and return_literal and not return_identifier:
return (x[1] for x in labels)
elif return_prop and return_literal and not return_identifier:
return ((x[1], x[0]) for x in labels)
elif not return_prop and not return_literal and return_identifier:
return ((str(x[1]), x[2]) for x in labels)
elif return_prop and not return_literal and return_identifier:
return ((str(x[1]), x[0], x[2]) for x in labels)
elif not return_prop and return_literal and return_identifier:
return ((x[1], x[2]) for x in labels)
else: # everything true
return ((x[1], x[0], x[2]) for x in labels)
def get_identifiers(self) -> Set[Identifier]:
"""Get all the identifiers in the session."""
return set(self.iter_identifiers())
def get_entities(self) -> Set[OntologyEntity]:
"""Get all the entities stored in the session."""
return {x for x in self}
_interface_driver: Optional[InterfaceDriver] = None
def _update_and_merge_helper(
self,
entity: OntologyEntity,
mode: bool,
visited: Optional[set] = None,
) -> None:
"""Private `merge` and `update` helper.
Args:
entity: The ontology entity to merge.
mode: True means update, False means merge.
visited: Entities that have already been updated or merged.
"""
if entity.session is None: # Newly created entity.
if mode:
self._graph.remove((entity.iri, None, None))
for t in entity.graph.triples((None, None, None)):
self._graph.add(t)
elif entity not in self: # Entity from another session.
active_relationship = self.ontology.from_identifier(
simphony_namespace.activeRelationship
)
try:
existing = self.from_identifier(entity.identifier)
except KeyError:
existing = None
self._track_identifiers(entity.identifier)
# Clear old types, active relationships and attributes for
# the update operation.
if existing and mode:
# Clear old types.
self._graph.remove((existing.identifier, RDF.type, None))
for p in existing.graph.predicates(existing.identifier, None):
try:
predicate = self.ontology.from_identifier(p)
except KeyError:
continue
# Clear attributes or active relationships.
if isinstance(predicate, OntologyAttribute) or (
isinstance(predicate, OntologyRelationship)
and active_relationship in predicate.superclasses
):
self._graph.remove((existing.identifier, p, None))
# Merge new types, active relationships and attributes.
# The double for loop pattern (first loop over p, then loop over o)
# is used because calling `self.ontology.from_identifier` is
# expensive.
visited = visited if visited is not None else set()
for p in entity.graph.predicates(entity.identifier, None):
try:
predicate = self.ontology.from_identifier(p)
except KeyError:
# Merge new types.
if p == RDF.type:
for o in entity.graph.objects(entity.identifier, p):
self._graph.add((entity.identifier, p, o))
continue
for o in entity.graph.objects(entity.identifier, p):
if isinstance(predicate, OntologyAttribute):
# Merge attributes.
self._graph.add((entity.identifier, p, o))
elif (
isinstance(predicate, OntologyRelationship)
and active_relationship in predicate.superclasses
):
# Merge active relationships.
obj = entity.session.from_identifier(o)
if not isinstance(obj, OntologyIndividual):
continue
if obj.identifier not in visited:
visited.add(obj.identifier)
self._update_and_merge_helper(obj, mode, visited)
self._graph.add((entity.identifier, p, o))
creation_set: Set[Identifier]
_namespaces: Dict[URIRef, str]
_graph: Graph
_graph_writable: Graph
_driver: Optional[Interface] = None
@property
def _session_linked(self) -> Session:
return self
def _track_identifiers(self, identifier, delete=False):
# Keep track of new additions while inside context manager.
if delete:
self.creation_set.discard(identifier)
else:
entity_triples_exist = (
next(self._graph.triples((identifier, None, None)), None)
is not None
)
if not entity_triples_exist:
self.creation_set.add(identifier)
class QueryResult(SPARQLResult):
"""SPARQL query result."""
session: Session
def __init__(self, *args, session: Optional[Session] = None, **kwargs):
"""Initialize the query result.
Namely, a session is linked to this query result so that if ontology
individuals are requested,
Args:
session: Session to which this result is linked to.
"""
self.session = session or Session.get_default_session()
super().__init__(*args, **kwargs)
# ↓ --------------------- Public API --------------------- ↓ #
def __call__(
self, **kwargs
) -> Union[Iterator[Triple], Iterator[bool], Iterator[ResultRow]]:
"""Select the datatypes of the query results ofr SELECT queries.
Args:
**kwargs: For each variable name on the query, a callable
can be specified as keyword argument. When retrieving
results, this callable will be run on the RDFLib item from the
result. Literals are an exception. The callable will
be applied on top of the result of the `toPython()` method
of the callable.
Raises:
ValueError: When the query that produced this result object is
not a SELECT query.
"""
if self.type != "SELECT":
if kwargs:
raise ValueError(
f"Result datatypes cannot be converted for "
f"{self.type} queries."
)
yield from self
return
for key, value in kwargs.items():
"""Filter certain provided callables and replace them by others."""
if isclass(value) and issubclass(value, OntologyIndividual):
"""Replace OntologyIndividual with spawning the individual
from its identifier."""
kwargs[key] = lambda x: self.session.from_identifier_typed(
x, typing=OntologyIndividual
)
for row in self:
"""Yield the rows with the applied datatype transformation."""
values = {
Variable(var): kwargs.get(str(var), lambda x: x)(
row[i]
if not isinstance(row[i], Literal)
else row[i].toPython()
)
for i, var in enumerate(self.vars)
}
yield ResultRow(values, self.vars)
# ↑ --------------------- Public API --------------------- ↑ #
Session.default_ontology = Session(
identifier="default ontology", ontology=True
)
Session.set_default_session(Session(identifier="default session"))
# This default ontology is later overwritten by simphony_osp/utils/pico.py