import numpy as np
import torch as th
from mowl.utils.data import FastTensorDataLoader
from mowl.error import messages as msg
import logging
from deprecated.sphinx import versionchanged
import logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
[docs]
class BaseRankingEvaluator():
"""
Base class for ranking evaluation of ontology embedding methods.
"""
def __init__(self, heads, tails, batch_size, device):
"""
:param heads: The indices of the head entities.
:type heads: :class:`torch.Tensor`
:param tails: The indices of the tail entities.
:type tails: :class:`torch.Tensor`
:param batch_size: The batch size for evaluation.
:type batch_size: int
:param device: The device to use for evaluation.
:type device: str
"""
self.batch_size = batch_size
self.device = device
self.heads = heads.to(self.device)
self.tails = tails.to(self.device)
sorted_heads = th.sort(heads)[0]
sorted_tails = th.sort(tails)[0]
assert (heads == sorted_heads).all(), "Heads must be sorted."
assert (tails == sorted_tails).all(), "Tails must be sorted."
if len(heads) != len(tails):
logger.warning(f"Detected a different number of evaluation heads and tails. AUC metric will not be accurate in this case if you evaluate in mode='both'")
head_idx = th.arange(len(heads), dtype=th.long, device=self.device)
tail_idx = th.arange(len(tails), dtype=th.long, device=self.device)
if not (heads == head_idx).all():
logger.info(f"Head indices are incomplete. This is normal if you are predicting over a subset of entities.")
max_head = heads.max().item() + 1
self.mapped_heads = - th.ones(max_head, dtype=th.long, device=self.device)
self.mapped_heads[heads] = head_idx
else:
self.mapped_heads = heads
if not (tails == tail_idx).all():
logger.info(f"Tail indices are incomplete. This is normal if you are predicting over a subset of entities.")
max_tail = tails.max().item() + 1
self.mapped_tails = - th.ones(max_tail, dtype=th.long, device=self.device)
self.mapped_tails[tails] = tail_idx
else:
self.mapped_tails = tails
self.filtering_labels = th.ones((len(self.heads), len(self.tails))).to(self.device)
[docs]
def update_filtering_labels(self, data):
if data is None:
return
if data.shape[1] == 2:
heads, tails = data[:, 0], data[:, 1]
elif data.shape[1] == 3:
heads, tails = data[:, 0], data[:, 2]
else:
raise ValueError("Data must have 2 or 3 columns.")
mapped_heads = self.mapped_heads[heads]
head_mask = mapped_heads == -1
mapped_tails = self.mapped_tails[tails]
tail_mask = mapped_tails == -1
whole_mask = head_mask | tail_mask
mapped_heads = mapped_heads[~whole_mask]
mapped_tails = mapped_tails[~whole_mask]
self.filtering_labels[mapped_heads, mapped_tails] = 10000
[docs]
def get_scores(self, evaluation_model, batch):
logger.warning("Your are using a generic `get_scores` method. Please implement a specific one for your model.")
return evaluation_model(batch)
[docs]
def get_expanded_scores(self, evaluation_model, batch, mode):
batch_rels = None
if batch.shape[1] == 2:
batch_heads, batch_tails = batch[:, 0], batch[:, 1]
elif batch.shape[1] == 3:
batch_heads, batch_rels, batch_tails = batch[:, 0], batch[:, 1], batch[:, 2]
else:
raise ValueError("Batch must have 2 or 3 columns.")
num_batch_heads, num_batch_tails = len(batch_heads), len(batch_tails)
if mode in ["head_centric", "both"]:
batch_heads = batch_heads.repeat_interleave(len(self.tails)).unsqueeze(1)
eval_tails = th.arange(len(self.tails), device=self.device).repeat(num_batch_heads).unsqueeze(1)
if batch_rels is None:
data = th.cat([batch_heads, eval_tails], dim=1)
else:
aux_batch_rels = batch_rels.repeat_interleave(len(self.tails)).unsqueeze(1)
data = th.cat([batch_heads, aux_batch_rels, eval_tails], dim=1)
head_scores = self.get_scores(evaluation_model, data)
head_scores = head_scores.view(-1, len(self.tails))
if mode in ["tail_centric", "both"]:
batch_tails = batch_tails.repeat_interleave(len(self.heads)).unsqueeze(1)
eval_heads = th.arange(len(self.heads), device=self.device).repeat(num_batch_tails).unsqueeze(1)
if batch_rels is None:
data = th.cat([eval_heads, batch_tails], dim=1)
else:
aux_batch_rels = batch_rels.repeat_interleave(len(self.heads)).unsqueeze(1)
data = th.cat([eval_heads, aux_batch_rels, batch_tails], dim=1)
tail_scores = self.get_scores(evaluation_model, data)
tail_scores = tail_scores.view(-1, len(self.heads))
if mode == "head_centric":
return head_scores, None
elif mode == "tail_centric":
return None, tail_scores
elif mode == "both":
return head_scores, tail_scores
else:
raise ValueError(f"Invalid mode: {mode}")
[docs]
@th.no_grad()
def compute_ranking_metrics(self, evaluation_model, test_data, filter_data=None, mode="head_centric"):
"""
Compute the ranking metrics for the evaluation model on the test data.
:param evaluation_model: The evaluation model.
:type evaluation_model: :class:`torch.nn.Module`
:param test_data: The test data containing the indices of the embeddings
:type test_data: :class:`torch.Tensor`
:param filter_data: The filter data containing the indices of the embeddings
:type filter_data: :class:`torch.Tensor`
:param mode: The mode of the evaluation.
:type mode: str
:return: The computed ranking metrics.
:rtype: dict
"""
if not mode in ["head_centric", "tail_centric", "both"]:
raise ValueError("Invalid mode. Choose between 'head_centric', 'tail_centric' or 'both'.")
logger.debug(f"Computing ranking metrics in {mode} mode.")
logger.debug(f"Test data shape: {test_data.shape}")
evaluation_model.to(self.device)
evaluation_model.eval()
num_heads = len(self.heads)
num_tails = len(self.tails)
self.update_filtering_labels(filter_data)
dataloader = FastTensorDataLoader(test_data, batch_size=self.batch_size, shuffle=False)
metrics = dict()
mrr, fmrr = 0, 0
mr, fmr = 0, 0
ranks, franks = dict(), dict()
hits_k = dict({"1": 0, "3": 0, "10": 0, "50": 0, "100": 0})
f_hits_k = dict({"1": 0, "3": 0, "10": 0, "50": 0, "100": 0})
for batch, in dataloader:
if batch.shape[1] == 2:
heads, tails = batch[:, 0], batch[:, 1]
elif batch.shape[1] == 3:
heads, tails = batch[:, 0], batch[:, 2]
else:
raise ValueError("Batch shape must be either (n, 2) or (n, 3)")
aux_heads = heads.clone()
aux_tails = tails.clone()
batch = batch.to(self.device)
head_scores, tail_scores = self.get_expanded_scores(evaluation_model, batch, mode)
if head_scores is not None:
for i, head in enumerate(aux_heads):
tail = tails[i]
head = th.where(self.heads == head)[0].item()
tail = th.where(self.tails == tail)[0].item()
preds = head_scores[i]
f_preds = preds * self.filtering_labels[head]
order = th.argsort(preds, descending=False)
rank = th.where(order == tail)[0].item() + 1
mr += rank
mrr += 1 / rank
f_order = th.argsort(f_preds, descending=False)
f_rank = th.where(f_order == tail)[0].item() + 1
fmr += f_rank
fmrr += 1 / f_rank
for k in hits_k:
if rank <= int(k):
hits_k[k] += 1
for k in f_hits_k:
if f_rank <= int(k):
f_hits_k[k] += 1
if rank not in ranks:
ranks[rank] = 0
ranks[rank] += 1
if f_rank not in franks:
franks[f_rank] = 0
franks[f_rank] += 1
if tail_scores is not None:
for i, tail in enumerate(aux_tails):
head = aux_heads[i]
head = th.where(self.heads == head)[0].item()
tail = th.where(self.tails == tail)[0].item()
preds = tail_scores[i]
f_preds = preds * self.filtering_labels[:, tail]
order = th.argsort(preds, descending=False)
rank = th.where(order == head)[0].item() + 1
mr += rank
mrr += 1 / rank
f_order = th.argsort(f_preds, descending=False)
f_rank = th.where(f_order == head)[0].item() + 1
fmr += f_rank
fmrr += 1 / f_rank
for k in hits_k:
if rank <= int(k):
hits_k[k] += 1
for k in f_hits_k:
if f_rank <= int(k):
f_hits_k[k] += 1
if rank not in ranks:
ranks[rank] = 0
ranks[rank] += 1
if f_rank not in franks:
franks[f_rank] = 0
franks[f_rank] += 1
if mode in ["head_centric", "tail_centric"]:
divisor = 1
elif mode == "both":
divisor = 2
else:
raise ValueError(f"Invalid mode: {mode}")
mr = mr / (divisor * len(test_data))
mrr = mrr / (divisor * len(test_data))
metrics["mr"] = mr
metrics["mrr"] = mrr
fmr = fmr / (divisor * len(test_data))
fmrr = fmrr / (divisor * len(test_data))
if mode == "both":
num_entities_for_auc = 0.5 * (num_heads + num_tails)
elif mode == "head_centric":
num_entities_for_auc = num_tails
elif mode == "tail_centric":
num_entities_for_auc = num_heads
auc = compute_rank_roc(ranks, num_entities_for_auc)
f_auc = compute_rank_roc(franks, num_entities_for_auc)
metrics["f_mr"] = fmr
metrics["f_mrr"] = fmrr
metrics["auc"] = auc
metrics["f_auc"] = f_auc
for k in hits_k:
hits_k[k] = hits_k[k] / (divisor * len(test_data))
metrics[f"hits@{k}"] = hits_k[k]
for k in f_hits_k:
f_hits_k[k] = f_hits_k[k] / (divisor * len(test_data))
metrics[f"f_hits@{k}"] = f_hits_k[k]
return metrics
[docs]
class RankingEvaluator(BaseRankingEvaluator):
"""
Ranking evaluation class for ontology embedding methods. It encapsulates :class:`BaseRankingEvaluator` to support mOWL datasets
"""
def __init__(self, dataset, batch_size=16, device="cpu"):
"""
:param dataset: The mOWL dataset object.
:type dataset: :class:`mowl.datasets.base.Dataset`
:param batch_size: The batch size for evaluation.
:type batch_size: int
:param device: The device to use for evaluation.
:type device: str
"""
self.dataset = dataset
self.class_to_id = {c: i for i, c in enumerate(self.dataset.classes.as_str)}
self.id_to_class = {i: c for c, i in self.class_to_id.items()}
self.relation_to_id = {r: i for i, r in enumerate(self.dataset.object_properties.as_str)}
self.id_to_relation = {i: r for r, i in self.relation_to_id.items()}
eval_heads, eval_tails = self.dataset.evaluation_classes
self.class_id_to_head_id = {self.class_to_id[c]: i for i, c in enumerate(eval_heads.as_str)}
self.class_id_to_tail_id = {self.class_to_id[c]: i for i, c in enumerate(eval_tails.as_str)}
eval_heads, eval_tails = self.dataset.evaluation_classes
evaluation_heads_tensor = th.tensor([self.class_to_id[c] for c in eval_heads.as_str], dtype=th.long).to(device)
evaluation_tails_tensor = th.tensor([self.class_to_id[c] for c in eval_tails.as_str], dtype=th.long).to(device)
super().__init__(evaluation_heads_tensor, evaluation_tails_tensor, batch_size, device)
[docs]
def create_tuples(self, ontology):
"""
Create tuples from the ontology.
:param ontology: The ontology.
:type ontology: :class:`org.semanticweb.owlapi.model.OWLOntology`
:return: The created tuples.
:rtype: :class:`torch.Tensor`
"""
raise NotImplementedError
[docs]
def evaluate(self, evaluation_model, testing_ontology, filter_ontologies = None, mode="head_centric"):
"""
Evaluate the model on the testing ontology.
:param testing_ontology: The testing ontology.
:type testing_ontology: :class:`org.semanticweb.owlapi.model.OWLOntology`
:param filter_ontologies: The filter ontologies.
:type filter_ontologies: list, optional
:param mode: The mode of the evaluation.
:type mode: str
:return: The computed ranking metrics.
:rtype: dict
"""
testing_data = self.create_tuples(testing_ontology)
filter_data = None
if filter_ontologies is not None:
filter_data = []
for ontology in filter_ontologies:
filter_data.append(self.create_tuples(ontology))
filter_data = th.cat(filter_data, dim=0)
return self.compute_ranking_metrics(evaluation_model, testing_data, filter_data=filter_data, mode=mode)
[docs]
@versionchanged(version="1.0.0", reason="Updated Evaluator with a new API.")
class Evaluator:
"""
Base evaluation class for ontology embedding methods.
:param dataset: mOWL dataset object. Required to obtain the ontology entities (classes, individuals, object properties, etc.).
:type dataset: :class:`mowl.datasets.base.Dataset`
:param device: Device to use for the evaluation. Defaults to 'cpu'.
:type device: str, optional
:param batch_size: Batch size for evaluation. Defaults to 16.
:type batch_size: int, optional
"""
def __init__(self, dataset, device="cpu", batch_size=16):
self.dataset = dataset
self.device = device
self.batch_size = batch_size
self.train_tuples = self.create_tuples(dataset.ontology)
self.valid_tuples = self.create_tuples(dataset.validation)
self.test_tuples = self.create_tuples(dataset.testing)
self._deductive_closure_tuples = None
self.class_to_id = {c: i for i, c in enumerate(self.dataset.classes.as_str)}
self.id_to_class = {i: c for c, i in self.class_to_id.items()}
self.relation_to_id = {r: i for i, r in enumerate(self.dataset.object_properties.as_str)}
self.id_to_relation = {i: r for r, i in self.relation_to_id.items()}
eval_heads, eval_tails = self.dataset.evaluation_classes
self.class_id_to_head_id = {self.class_to_id[c]: i for i, c in enumerate(eval_heads.as_str)}
self.class_id_to_tail_id = {self.class_to_id[c]: i for i, c in enumerate(eval_tails.as_str)}
eval_heads, eval_tails = self.dataset.evaluation_classes
print(f"Number of evaluation classes: {len(eval_heads)}")
self.evaluation_heads = th.tensor([self.class_to_id[c] for c in eval_heads.as_str], dtype=th.long).to(self.device)
self.evaluation_tails = th.tensor([self.class_to_id[c] for c in eval_tails.as_str], dtype=th.long).to(self.device)
@property
def deductive_closure_tuples(self):
if self._deductive_closure_tuples is None:
self._deductive_closure_tuples = self.create_tuples(self.dataset.deductive_closure_ontology)
return self._deductive_closure_tuples
[docs]
def create_tuples(self, ontology):
raise NotImplementedError
[docs]
def get_logits(self, batch):
raise NotImplementedError
[docs]
def evaluate_base(self, model, eval_tuples, mode="test",
include_deductive_closure=False,
exclude_testing_set=False,
filter_deductive_closure=False,
**kwargs):
model = model.to(self.device)
num_heads, num_tails = len(self.evaluation_heads), len(self.evaluation_tails)
model.eval()
if not mode in ["valid", "test"]:
raise ValueError(f"Mode must be either 'valid' or 'test', not {mode}")
if include_deductive_closure:
mask1 = (self.deductive_closure_tuples.unsqueeze(1) == self.train_tuples).all(dim=-1).any(dim=-1)
mask2 = (self.deductive_closure_tuples.unsqueeze(1) == self.valid_tuples).all(dim=-1).any(dim=-1)
mask = mask1 | mask2
deductive_closure_tuples = self.deductive_closure_tuples[~mask]
if exclude_testing_set:
eval_tuples = deductive_closure_tuples # only deductive closure
else:
eval_tuples = th.cat([eval_tuples, deductive_closure_tuples], dim=0)
dataloader = FastTensorDataLoader(eval_tuples, batch_size=self.batch_size, shuffle=False)
metrics = dict()
mrr, fmrr = 0, 0
mr, fmr = 0, 0
ranks, franks = dict(), dict()
if mode == "test":
hits_k = dict({"1": 0, "3": 0, "10": 0, "50": 0, "100": 0})
f_hits_k = dict({"1": 0, "3": 0, "10": 0, "50": 0, "100": 0})
filtering_labels = self.get_filtering_labels(num_heads,
num_tails,
self.class_id_to_head_id,
self.class_id_to_tail_id,
filter_deductive_closure=filter_deductive_closure)
if include_deductive_closure:
deductive_labels = self.get_deductive_labels(num_heads, num_tails, **kwargs)
num_sides = 2
with th.no_grad():
for batch, in dataloader:
if batch.shape[1] == 2:
heads, tails = batch[:, 0], batch[:, 1]
elif batch.shape[1] == 3:
heads, tails = batch[:, 0], batch[:, 2]
else:
raise ValueError("Batch shape must be either (n, 2) or (n, 3)")
aux_heads = heads.clone()
aux_tails = tails.clone()
batch = batch.to(self.device)
logits_heads, logits_tails = self.get_logits(model, batch, **kwargs)
if logits_heads is None:
num_sides -= 1
else:
for i, head in enumerate(aux_heads):
tail = tails[i]
head = th.where(self.evaluation_heads == head)[0].item()
tail = th.where(self.evaluation_tails == tail)[0].item()
preds = logits_heads[i]
if include_deductive_closure:
ded_labels = deductive_labels[head].to(preds.device)
ded_labels[tail] = 1
preds = preds * ded_labels
order = th.argsort(preds, descending=False)
rank = th.where(order == tail)[0].item() + 1
mr += rank
mrr += 1 / rank
if mode == "test":
f_preds = preds * filtering_labels[head].to(preds.device)
if include_deductive_closure:
# when evaluating with deductive closure
# axioms, for a testing axiom we need to
# filter the other deductive closure
# axioms. Otherwise, we could, in the best
# case, score many true axioms at the top
# and will never get, for example, good
# hits@1.
ded_labels = deductive_labels[head].to(preds.device)
ded_labels[tail] = 1
f_preds = f_preds * ded_labels
f_order = th.argsort(f_preds, descending=False)
f_rank = th.where(f_order == tail)[0].item() + 1
fmr += f_rank
fmrr += 1 / f_rank
if mode == "test":
for k in hits_k:
if rank <= int(k):
hits_k[k] += 1
for k in f_hits_k:
if f_rank <= int(k):
f_hits_k[k] += 1
if rank not in ranks:
ranks[rank] = 0
ranks[rank] += 1
if f_rank not in franks:
franks[f_rank] = 0
franks[f_rank] += 1
if logits_tails is None:
num_sides -= 1
else:
for i, tail in enumerate(aux_tails):
head = aux_heads[i]
head = th.where(self.evaluation_heads == head)[0].item()
tail = th.where(self.evaluation_tails == tail)[0].item()
preds = logits_tails[i]
if include_deductive_closure:
ded_labels = deductive_labels[:, tail].to(preds.device)
ded_labels[head] = 1
preds = preds * ded_labels
order = th.argsort(preds, descending=False)
rank = th.where(order == head)[0].item() + 1
mr += rank
mrr += 1 / rank
if mode == "test":
f_preds = preds * filtering_labels[:, tail].to(preds.device)
if include_deductive_closure:
ded_labels = deductive_labels[:, tail].to(preds.device)
ded_labels[head] = 1
f_preds = f_preds * ded_labels
f_order = th.argsort(f_preds, descending=False)
f_rank = th.where(f_order == head)[0].item() + 1
fmr += f_rank
fmrr += 1 / f_rank
if mode == "test":
for k in hits_k:
if rank <= int(k):
hits_k[k] += 1
for k in f_hits_k:
if f_rank <= int(k):
f_hits_k[k] += 1
if rank not in ranks:
ranks[rank] = 0
ranks[rank] += 1
if f_rank not in franks:
franks[f_rank] = 0
franks[f_rank] += 1
mr = mr / (num_sides * len(eval_tuples))
mrr = mrr / (num_sides * len(eval_tuples))
metrics["mr"] = mr
metrics["mrr"] = mrr
if mode == "test":
fmr = fmr / (num_sides * len(eval_tuples))
fmrr = fmrr / (num_sides * len(eval_tuples))
auc = compute_rank_roc(ranks, num_tails)
f_auc = compute_rank_roc(franks, num_tails)
metrics["f_mr"] = fmr
metrics["f_mrr"] = fmrr
metrics["auc"] = auc
metrics["f_auc"] = f_auc
for k in hits_k:
hits_k[k] = hits_k[k] / (num_sides * len(eval_tuples))
metrics[f"hits@{k}"] = hits_k[k]
for k in f_hits_k:
f_hits_k[k] = f_hits_k[k] / (num_sides * len(eval_tuples))
metrics[f"f_hits@{k}"] = f_hits_k[k]
metrics = {f"{mode}_{k}": v for k, v in metrics.items()}
return metrics
[docs]
def evaluate(self, *args,
include_deductive_closure=False,
exclude_testing_set=False,
filter_deductive_closure=False,
**kwargs):
"""
:param include_deductive_closure: Whether to evaluate using deductive closure axioms as positives. Defaults to False.
:type include_deductive_closure: bool, optional
:param exclude_testing_set: Whether to exclude the testing set from the evaluation. Defaults to False.
:type exclude_testing_set: bool, optional
:param filter_deductive_closure: Whether to filter deductive closure axioms from the evaluation. Defaults to False.
:type filter_deductive_closure: bool, optional
"""
if not isinstance(include_deductive_closure, bool):
raise TypeError(msg.get_type_error_message("include_deductive_closure", "bool", type(include_deductive_closure)))
if not isinstance(exclude_testing_set, bool):
raise TypeError(msg.get_type_error_message("exclude_testing_set", "bool", type(exclude_testing_set)))
if not isinstance(filter_deductive_closure, bool):
raise TypeError(msg.get_type_error_message("filter_deductive_closure", "bool", type(filter_deductive_closure)))
logger.info(f"Evaluating in device: {self.device}")
logger.info(f"Evaluating with deductive closure: {include_deductive_closure}")
logger.info(f"Excluding testing set: {exclude_testing_set}")
logger.info(f"Filtering deductive closure: {filter_deductive_closure}")
model = args[0]
mode = kwargs.get("mode")
if mode == "valid":
eval_tuples = self.valid_tuples
else:
eval_tuples = self.test_tuples
return self.evaluate_base(model, eval_tuples,
include_deductive_closure=include_deductive_closure,
exclude_testing_set=exclude_testing_set,
filter_deductive_closure=filter_deductive_closure,
**kwargs)
def compute_rank_roc(ranks, num_entities, method="riemann"):
if method == "riemann":
fn = riemann_sum
elif method == "trapz":
fn = np.trapz
else:
raise ValueError(f"Method {method} not recognized.")
num_entities = int(num_entities)
ranks = {k-1: v for k, v in ranks.items()}
min_rank = min(ranks.keys())
assert min_rank >= 0
all_ranks = {k: 0 for k in range(min_rank, num_entities)}
all_ranks.update(ranks)
ranks = all_ranks
auc_x = list(ranks.keys())
auc_x.sort()
auc_y = []
tpr = 0
sum_rank = sum(ranks.values())
for x in auc_x:
tpr += ranks[x]
auc_y.append(tpr / sum_rank)
auc = fn(auc_y, auc_x) / (num_entities - 1)
return auc
def riemann_sum(y, x):
dx = np.diff(x)
heights = y[:-1] # Use left endpoints for rectangle heights
integral = np.sum(heights * dx)
return integral