Source code for mowl.base_models.elmodel

from mowl.ontology.normalize import ELNormalizer
from mowl.base_models.model import Model
from mowl.datasets.el import ELDataset
from mowl.projection import projector_factory
import torch as th
from torch.utils.data import DataLoader, default_collate

from deprecated.sphinx import versionadded, versionchanged

from org.semanticweb.owlapi.model import OWLClassExpression, OWLClass, OWLObjectSomeValuesFrom, OWLObjectIntersectionOf

import copy
import numpy as np
import mowl.error.messages as msg
import os

[docs] @versionchanged(version="1.0.0", reason="Added the 'load_normalized' parameter.") class EmbeddingELModel(Model): """Abstract class for :math:`\mathcal{EL}` embedding methods. :param dataset: mOWL dataset to use for training and evaluation. :type dataset: :class:`mowl.datasets.Dataset` :param embed_dim: The embedding dimension. :type embed_dim: int :param batch_size: The batch size to use for training. :type batch_size: int :param extended: If `True`, the model is supposed with 7 EL normal forms. This will be \ reflected on the :class:`DataLoaders` that will be generated and also the model must \ contain 7 loss functions. If `False`, the model will work with 4 normal forms only, \ merging the 3 extra to their corresponding origin normal forms. Defaults to True :type extended: bool, optional :param load_normalized: If `True`, the ontology is assumed to be normalized and GCIs are extracted directly. Defaults to False. :type load_normalized: bool, optional :param device: The device to use for training. Defaults to "cpu". :type device: str, optional """ def __init__(self, dataset, embed_dim, batch_size, extended=True, model_filepath=None, load_normalized=False, device="cpu"): super().__init__(dataset, model_filepath=model_filepath) if not isinstance(embed_dim, int): raise TypeError("Parameter 'embed_dim' must be of type int.") if not isinstance(batch_size, int): raise TypeError("Parameter batch_size must be of type int.") if not isinstance(extended, bool): raise TypeError("Optional parameter extended must be of type bool.") if not isinstance(load_normalized, bool): raise TypeError("Optional parameter load_normalized must be of type bool.") if not isinstance(device, str): raise TypeError("Optional parameter device must be of type str.") self._datasets_loaded = False self._dataloaders_loaded = False self._extended = extended self.embed_dim = embed_dim self.batch_size = batch_size self.device = device self.load_normalized = load_normalized self._training_datasets = None self._validation_datasets = None self._testing_datasets = None self._loaded_eval = False
[docs] def init_module(self): raise NotImplementedError
def _load_datasets(self): """This method will create different data attributes and finally the corresponding \ DataLoaders for each GCI type in each subset (training, validation and testing). """ if self._datasets_loaded: return training_el_dataset = ELDataset(self.dataset.ontology, self.class_index_dict, self.object_property_index_dict, extended=self._extended, load_normalized = self.load_normalized, device=self.device) self._training_datasets = training_el_dataset.get_gci_datasets() self._validation_datasets = None if self.dataset.validation: validation_el_dataset = ELDataset(self.dataset.validation, self.class_index_dict, self.object_property_index_dict, extended=self._extended, device=self.device) self._validation_datasets = validation_el_dataset.get_gci_datasets() self._testing_datasets = None if self.dataset.testing: testing_el_dataset = ELDataset(self.dataset.testing, self.class_index_dict, self.object_property_index_dict, extended=self._extended, device=self.device) self._testing_datasets = testing_el_dataset.get_gci_datasets() self._datasets_loaded = True def _load_dataloaders(self): if self._dataloaders_loaded: return self._load_datasets() self._training_dataloaders = { k: DataLoader(v, batch_size=self.batch_size, pin_memory=False) for k, v in self._training_datasets.items()} if self._validation_datasets: self._validation_dataloaders = { k: DataLoader(v, batch_size=self.batch_size, pin_memory=False) for k, v in self._validation_datasets.items()} if self._testing_datasets: self._testing_dataloaders = { k: DataLoader(v, batch_size=self.batch_size, pin_memory=False) for k, v in self._testing_datasets.items()} self._dataloaders_loaded = True @property def training_datasets(self): """Returns the training datasets for each GCI type. Each dataset is an instance \ of :class:`mowl.datasets.el.ELDataset` :rtype: dict """ self._load_datasets() return self._training_datasets @property def validation_datasets(self): """Returns the validation datasets for each GCI type. Each dataset is an instance \ of :class:`mowl.datasets.el.ELDataset` :rtype: dict """ if self.dataset.validation is None: raise AttributeError("Validation dataset is None.") self._load_datasets() return self._validation_datasets @property def testing_datasets(self): """Returns the testing datasets for each GCI type. Each dataset is an instance \ of :class:`mowl.datasets.el.ELDataset` :rtype: dict """ if self.dataset.testing is None: raise AttributeError("Testing dataset is None.") self._load_datasets() return self._testing_datasets @property def training_dataloaders(self): """Returns the training dataloaders for each GCI type. Each dataloader is an instance \ of :class:`torch.utils.data.DataLoader` :rtype: dict """ self._load_dataloaders() return self._training_dataloaders @property def validation_dataloaders(self): """Returns the validation dataloaders for each GCI type. Each dataloader is an instance \ of :class:`torch.utils.data.DataLoader` :rtype: dict """ if self.dataset.validation is None: raise AttributeError("Validation dataloader is None.") self._load_dataloaders() return self._validation_dataloaders @property def testing_dataloaders(self): """Returns the testing dataloaders for each GCI type. Each dataloader is an instance \ of :class:`torch.utils.data.DataLoader` :rtype: dict """ if self.dataset.testing is None: raise AttributeError("Testing dataloader is None.") self._load_dataloaders() return self._testing_dataloaders
[docs] @versionadded(version="0.2.0") def score(self, axiom): """ Returns the score of the given axiom. :param axiom: The axiom to score. :type axiom: :class:`org.semanticweb.owlapi.model.OWLAxiom` """ def data_point_to_tensor(data_point): data_point = th.tensor(data_point, dtype=th.long, device=self.device) data_point = data_point.unsqueeze(0) return data_point not_el_error_msg = "This axiom does not belong to the EL description logic specification." sub, super_ = axiom.getSubClass(), axiom.getSuperClass() if not isinstance(sub, OWLClassExpression): raise TypeError("Parameter sub must be of type OWLClassExpression.") if isinstance(sub, OWLClass): sub_id = self.dataset.class_to_id[sub] if isinstance(super_, OWLClass): super_id = self.dataset.class_to_id[super_] if super_.isOWLNothing(): if self.extended: gci_name = "gci0_bot" else: gci_name = "gci0" else: gci_name = "gci0" gci_data = data_point_to_tensor([sub_id, super_id]) elif isinstance(super_, OWLObjectSomeValuesFrom): rel = super_.getProperty() filler = super_.getFiller() if not isinstance(filler, OWLClass): raise TypeError(not_el_error_msg) rel_id = self.dataset.object_property_to_id[rel] filler_id = self.dataset.class_to_id[filler] gci_name = "gci2" gci_data = data_point_to_tensor([sub_id, rel_id, filler_id]) elif isinstance(sub, OWLObjectSomeValuesFrom): rel = sub.getProperty() filler = sub.getFiller() if not isinstance(filler, OWLClass): raise TypeError(not_el_error_msg) if not isinstance(super_, OWLClass): raise TypeError(not_el_error_msg) rel_id = self.dataset.object_property_to_id[rel] filler_id = self.dataset.class_to_id[filler] super_id = self.dataset.class_to_id[super_] if super_.isOWLNothing(): if self.extended: gci_name = "gci3_bot" else: gci_name = "gci3" else: gci_name = "gci3" gci_data = data_point_to_tensor([rel_id, filler_id, super_id]) elif isinstance(sub, OWLObjectIntersectionOf): operands = sub.getOperandsAsList() if len(operands) != 2: raise TypeError(not_el_error_msg) left, right = tuple(operands) if not isinstance(left, OWLClass): raise TypeError(not_el_error_msg) if not isinstance(right, OWLClass): raise TypeError(not_el_error_msg) if not isinstance(super_, OWLClass): raise TypeError(not_el_error_msg) left_id = self.dataset.class_to_id[left] right_id = self.dataset.class_to_id[right] super_id = self.dataset.class_to_id[super_] if super_.isOWLNothing(): if self.extended: gci_name = "gci1_bot" else: gci_name = "gci1" else: gci_name = "gci1" gci_data = data_point_to_tensor([left_id, right_id, super_id]) else: raise TypeError("This axiom does not belong to EL.") score = self.module(gci_data, gci_name) return score
@property def class_embeddings(self): class_embeds = { k: v for k, v in zip(self.class_index_dict.keys(), self.module.class_embed.weight.cpu().detach().numpy())} return class_embeds @property def object_property_embeddings(self): rel_embeds = { k: v for k, v in zip(self.object_property_index_dict.keys(), self.module.rel_embed.weight.cpu().detach().numpy())} return rel_embeds @property def individual_embeddings(self): if self.module.ind_embed is None: return dict() ind_embeds = { k: v for k, v in zip(self.individual_index_dict.keys(), self.module.ind_embed.weight.cpu().detach().numpy())} return ind_embeds
[docs] def add_axioms(self, *axioms): prev_class_embeds = None prev_object_property_embeds = None prev_individual_embeds = None if len(self.class_embeddings) > 0: prev_class_embeds = copy.deepcopy(self.class_embeddings) if len(self.object_property_embeddings) > 0: prev_object_property_embeds = copy.deepcopy(self.object_property_embeddings) if len(self.individual_embeddings) > 0: prev_individual_embeds = copy.deepcopy(self.individual_embeddings) self.dataset.add_axioms(*axioms) if prev_class_embeds is not None: new_class_embeds = [] for cls in self.dataset.classes: cls = str(cls.toStringID()) if cls in prev_class_embeds: new_class_embeds.append(prev_class_embeds[cls]) else: new_class_embeds.append(np.random.normal(size=self.embed_dim)) new_class_embeds = np.asarray(new_class_embeds) self.module.class_embed.weight.data = th.from_numpy(new_class_embeds).float() if prev_object_property_embeds is not None: new_object_property_embeds = [] for rel in self.dataset.object_properties: rel = str(rel.toStringID()) if rel in prev_object_property_embeds: new_object_property_embeds.append(prev_object_property_embeds[rel]) else: new_object_property_embeds.append(np.random.normal(size=self.embed_dim)) new_object_property_embeds = np.asarray(new_object_property_embeds) self.module.rel_embed.weight.data = th.from_numpy(new_object_property_embeds).float() if prev_individual_embeds is not None: new_individual_embeds = [] for ind in self.dataset.individuals: ind = str(ind.toStringID()) if ind in prev_individual_embeds: new_individual_embeds.append(prev_individual_embeds[ind]) else: new_individual_embeds.append(np.random.normal(size=self.embed_dim)) new_individual_embeds = np.asarray(new_individual_embeds) self.module.ind_embed.weight.data = th.from_numpy(new_individual_embeds).float()
[docs] def from_pretrained(self, model): if not isinstance(model, str): raise TypeError("Parameter model must be a string pointing to the model file.") if not os.path.exists(model): raise FileNotFoundError("Pretrained model path does not exist") #self._model_filepath = model self._is_pretrained = True if not isinstance(model, str): raise TypeError self.module.load_state_dict(th.load(model))
#self._kge_method = kge_method
[docs] def load_pairwise_eval_data(self): if self._loaded_eval: return eval_property = self.dataset.get_evaluation_property() head_classes, tail_classes = self.dataset.evaluation_classes self._head_entities = head_classes.as_str self._tail_entities = tail_classes.as_str eval_projector = projector_factory('taxonomy_rels', taxonomy=False, relations=[eval_property]) self._training_set = eval_projector.project(self.dataset.ontology) self._testing_set = eval_projector.project(self.dataset.testing) self._loaded_eval = True
@property def training_set(self): self.load_pairwise_eval_data() return self._training_set @property def testing_set(self): self.load_pairwise_eval_data() return self._testing_set @property def head_entities(self): self.load_pairwise_eval_data() return self._head_entities @property def tail_entities(self): self.load_pairwise_eval_data() return self._tail_entities