Source code for mowl.projection.categorical.model

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
    
from .edge import Edge, Node
from .utils import IGNORED_AXIOM_TYPES, IGNORED_EXPRESSION_TYPES, pairs

from mowl.projection.base import ProjectionModel
from mowl.owlapi import OWLAPIAdapter
from mowl.owlapi.defaults import BOT, TOP
import mowl.error.messages as msg
from mowl.projection import Edge as mEdge

from org.semanticweb.owlapi.model import  AxiomType, EntityType, OWLObjectInverseOf, OWLOntology
from org.semanticweb.owlapi.model import ClassExpressionType as CT

import networkx as nx
from deprecated.sphinx import versionadded

from tqdm import tqdm

adapter = OWLAPIAdapter()

top_node = Node(owl_class = adapter.create_class(TOP))
bot_node = Node(owl_class = adapter.create_class(BOT))

class Graph():
    def __init__(self, abox_edges = None):
        self._node_to_id = {}
        self._id_to_node = {}
        self._out_edges = dict()
        self._in_edges = dict()

        if abox_edges is None:
            self.abox_edges = []
        else:
            self.abox_edges = abox_edges
        
    @property
    def node_to_id(self):
        return self._node_to_id

    @property
    def id_to_node(self):
        return self._id_to_node
    
    @property
    def nodes(self):
        return set(self.node_to_id.keys())

    @property
    def out_edges(self):
        return self._out_edges

    @property
    def in_edges(self):
        return self._in_edges

    def add_node(self, node):

        if node in self.node_to_id:
            return
        
        if not isinstance(node, Node):
            raise TypeError(f"Node must be of type Node. Got {type(node)}")

        if node.is_owl_thing():
            return
        if node.is_owl_nothing():
            return
        
        if bot_node not in self.node_to_id:
            self.node_to_id[bot_node] = len(self.node_to_id)
            self.id_to_node[self.node_to_id[bot_node]] = bot_node
            self.out_edges[bot_node] = set()
            self.out_edges[bot_node].add(top_node)
            self.in_edges[bot_node] = set()

        if top_node not in self.node_to_id:
            self.node_to_id[top_node] = len(self.node_to_id)
            self.id_to_node[self.node_to_id[top_node]] = top_node
            self.out_edges[top_node] = set()
            self.in_edges[top_node] = set()
            self.in_edges[top_node].add(bot_node)

            
        if not node in self.node_to_id:
            self.node_to_id[node] = len(self.node_to_id)
            self.id_to_node[self.node_to_id[node]] = node
            self.out_edges[node] = set()
            self.in_edges[node] = set()

        self.in_edges[node].add(node)
        self.out_edges[node].add(node)

        self.in_edges[node].add(bot_node)
        self.out_edges[bot_node].add(node)

        self.in_edges[top_node].add(node)
        self.out_edges[node].add(top_node)


                    
        if node.in_object_category() and not node.domain and not node.codomain and node.owl_class.getClassExpressionType() == CT.OWL_CLASS:
            negated = node.negate()
            and_node = adapter.create_object_intersection_of(node.owl_class, negated.owl_class)
            and_node = Node(owl_class = and_node)
            self.add_edge(Edge(and_node, "http://arrow", bot_node))
            self.add_edge(Edge(and_node, "http://arrow", node))
            self.add_edge(Edge(and_node, "http://arrow", negated))

            or_node = adapter.create_object_union_of(node.owl_class, negated.owl_class)
            or_node = Node(owl_class = or_node)
            self.add_edge(Edge(top_node, "http://arrow", or_node))
            self.add_edge(Edge(node, "http://arrow", or_node))
            self.add_edge(Edge(negated, "http://arrow", or_node))
            
        self.add_node(node.nnf())
                                        
            
    def add_edge(self, edge):
        src = edge.src
        dst = edge.dst
        self.add_node(src)
        self.add_node(dst)
                                                    
        self.out_edges[src].add(dst)
        self.in_edges[dst].add(src)

    def add_all_edges(self, *edges):
        for edge in edges:
            self.add_edge(edge)

    def as_edgelist(self):
        edges = []
        for source, targets in self.out_edges.items():
            for target in targets:
                edges.append((source, target))
        return edges

    def as_str_edgelist(self):
        edges = []
        for source, targets in self.out_edges.items():
            source = str(source)
            for target in targets:
                target = str(target)
                edges.append((source, "http://arrow", target))
        for source, target in self.abox_edges:
            source = str(source)
            target = str(target)
            edges.append((source, "http://type", target))

        edges  = [mEdge(*edge) for edge in edges]
        return edges

    def as_edges(self):
        for source, targets in self.out_edges.items():
            for target in targets:
                yield Edge(source, "http://arrow", target)

        for source, target in self.abox_edges:
            yield Edge(source, "http://type", target)
            
    def _lemma_6(self):
        # First equation, nothing to do
        # Second equation and left side of third equation and left side of fourth equation
        negated_nodes = set()
        for node in self.nodes:
            if node.is_negated() and not node.negated_domain:
                negated_nodes.add(node)
            
        for neg_node in tqdm(negated_nodes, desc="Lemma 6: Processing negated nodes"):
            in_edges = list(self.in_edges[neg_node])
            for in_node in in_edges:
                if in_node.is_owl_nothing() or in_node.is_owl_thing():
                    continue
                if in_node.domain:
                    continue

                node = neg_node.get_operand()
                
                if node == in_node:
                    continue
                
                neg_in_node = in_node.negate()
                op_edge = Edge(node, "saturation_lemma6", neg_in_node)

                intersection_owl = adapter.create_object_intersection_of(node.owl_class, in_node.owl_class)
                if len(intersection_owl.getNNF().getOperandsAsList()) == 1:
                    continue

                intersection = Node(owl_class = intersection_owl)
                
                assert node.domain == intersection.domain, f"Domain of {node} is {node.domain} and domain of {intersection} is {intersection.domain}"
                assert node.codomain == intersection.codomain, f"Codomain of {node} is {node.codomain} and codomain of {intersection} is {intersection.codomain}"
                assert node.negated_domain == intersection.negated_domain, f"Negated domain of {node} is {node.negated_domain} and negated domain of {intersection} is {intersection.negated_domain}"
                edge_int = Edge(intersection, "saturation_lemma6", bot_node)

            out_edges = list(self.out_edges[neg_node])
            for out_node in out_edges:
                if out_node.is_owl_thing() or out_node.is_owl_nothing():
                    continue
                if out_node.domain:
                    continue
                
                node = neg_node.get_operand()
                if node == out_node:
                    continue
                

                union = adapter.create_object_union_of(node.owl_class, out_node.owl_class)
                if len(union.getNNF().getOperandsAsList()) == 1:
                    continue
                
                union = Node(owl_class = union)
                edge_un = Edge(top_node, "saturation_lemma6", union)

                self.add_all_edges(op_edge, edge_int, edge_un)

                #TODO last equation (Morgan law)

        # Left side of third equation
        intersection_nodes = set()
        
        for node in self.nodes:
            if node.is_intersection():
                intersection_nodes.add(node)

        for int_node in tqdm(intersection_nodes, desc="Lemma 6: Processing intersection nodes"):
            if not bot_node in self.out_edges[int_node]:
                continue

            operands = list(int_node.owl_class.getOperandsAsList())
            intersection_pairs = pairs(operands)
            
            for node1, node2 in intersection_pairs:
                node1 = [n for n in node1 if not n.isOWLThing()] if len(node1) > 1 else node1
                node2 = [n for n in node2 if not n.isOWLThing()] if len(node2) > 1 else node2
                if len(node1) > 1:
                    node1 = adapter.create_object_intersection_of(*node1)
                else:
                    node1 = node1[0]
                node1 = Node(owl_class = node1)
                if len(node2) > 1:
                    node2 = adapter.create_object_intersection_of(*node2)
                else:
                    node2 = node2[0]
                node2 = Node(owl_class = node2.getObjectComplementOf())
                edge = Edge(node1, "saturation_lemma6", node2)
                self.add_edge(edge)
                
        # Left side of fourth equation
        union_nodes = set()
        for node in self.nodes:
            if node.is_union():
                
                union_nodes.add(node)

        for un_node in tqdm(union_nodes, desc="Lemma 6: Processing union nodes"):
            
            if not top_node in self.in_edges[un_node]:
                continue

            operands = un_node.owl_class.getOperandsAsList()
            union_pairs = pairs(operands)
            for node1, node2 in union_pairs:
                node1 = [n for n in node1 if not n.isOWLNothing()] if len(node1) > 1 else node1
                node2 = [n for n in node2 if not n.isOWLNothing()] if len(node2) > 1 else node2
                if len(node1) > 1:
                    node1 = adapter.create_object_union_of(*node1)
                else:
                    node1 = node1[0]
                node1 = Node(owl_class = node1.getObjectComplementOf())

                if len(node2) > 1:
                    node2 = adapter.create_object_union_of(*node2)
                else:
                    node2 = node2[0]
                    
                node2 = Node(owl_class = node2)

                edge = Edge(node1, "saturation_lemma6", node2)
                self.add_edge(edge)
                        


    def _definition_6(self):
        # Def 6. Although it is not defined explicitely in the paper, this definition will look for classes that are subclass of a disjointness.
        for node in self.nodes:
            if node.in_relation_category():
                continue
            if node.is_owl_thing() or node.is_owl_nothing():
                continue
            
            node_to_neg = dict()
            for out_node in self.out_edges[node]:
                if out_node.domain or out_node.codomain:
                    continue
                if out_node.is_owl_thing() or out_node.is_owl_nothing():
                    continue

                
                if not out_node in node_to_neg and not out_node.negate() in node_to_neg:
                    node_to_neg[out_node] = None
                elif out_node in node_to_neg:
                    continue
                elif out_node.negate() in node_to_neg:
                    node_to_neg[out_node.negate()] = out_node
                    
            for node1, node2 in node_to_neg.items():
                if node2 is None:
                    continue
                else:
                    intersection = adapter.create_object_intersection_of(node1.owl_class, node2.owl_class)
                    intersection = Node(owl_class = intersection)
                    edge = Edge(intersection, "saturation_lemma6", bot_node)
                    self.add_edge(edge)
                    edge2 = Edge(node, "saturation_lemma6", intersection)
                    self.add_edge(edge2)
                                        
                            
    def _definition_7(self):
        #Last equation, other equations are covered in lemma 8
        relations = set()
        for node in self.nodes:
            if node.is_whole_relation():
                relations.add(node)

        for rel in tqdm(relations, desc="Definition 7: Processing relations"):
            for in_rel in self.in_edges[rel]:
                if not in_rel.in_relation_category():
                    continue
                in_rel_codomain = in_rel.to_codomain()
                if not in_rel_codomain in self.nodes:
                    continue
                 
                for cod in self.out_edges[in_rel_codomain]:
                    if cod.domain or cod.codomain or cod.in_relation_category():
                        continue
                    in_rel_domain = in_rel.to_domain()

                    ex_rel_cod = adapter.create_object_some_values_from(rel.relation, cod.owl_class)
                    node = Node(owl_class = ex_rel_cod, relation = rel.relation, domain = True)
                    edge = Edge(in_rel_domain, "saturation_definition7", node)
                    self.add_edge(edge)
                    
                
                    
    def _lemma_8(self):
        relations = set()
        for node in self.nodes:
            if node.is_whole_relation():
                relations.add(node)

        for node in tqdm(self.nodes, desc="Lemma 8: Processing nodes"):
            if not node.in_object_category():
                continue
            if node.is_intersection() or node.is_union() or node.is_existential():
                continue
            if node.domain or node.codomain:
                continue
            if node.is_owl_thing() or node.is_owl_nothing():
                continue

            out_edges = list(self.out_edges[node])

            for out_node in out_edges:
                if not out_node.in_object_category():
                    continue
                if out_node.is_intersection() or out_node.is_union() or out_node.is_existential():
                    continue
                if out_node.domain or out_node.codomain:
                    continue
                if out_node.is_owl_thing():
                    continue
                
                for relation in relations:
                    ex_class = adapter.create_object_some_values_from(relation.relation, node.owl_class)
                    ox_out_class = adapter.create_object_some_values_from(relation.relation, out_node.owl_class)
                    ex_node = Node(owl_class = ex_class)
                    ox_out_node = Node(owl_class = ox_out_class)
                    edge = Edge(node, "saturation_lemma8", ex_node)
                    self.add_edge(edge)
                
    def _lemma_8_bk(self):
        existential_nodes = set()
        for node in self.nodes:
            if node.is_existential():
                existential_nodes.add(node)

        for node in existential_nodes:
            filler = node.owl_class.getFiller()
            property_ = node.owl_class.getProperty()
            filler_node = Node(owl_class = filler)
            domain_node = Node(owl_class = node.owl_class, relation = property_, domain = True)


            in_edges = list(self.in_edges[filler_node])
            for in_filler in in_edges:
                ex_in_class = adapter.create_object_some_values_from(node.owl_class.getProperty(), in_filler.owl_class)
                ex_in_node = Node(owl_class = ex_in_class)
                edge = Edge(ex_in_node, "saturation_lemma8", node)
                self.add_edge(edge)

                domain_in_node = Node(owl_class=ex_in_class, relation = property_, domain = True)
                edge = Edge(domain_in_node, "saturation_lemma8", domain_node)
                self.add_edge(edge)
                edge = Edge(domain_in_node, "saturation_lemma8", ex_in_node)
                self.add_edge(edge)
                edge = Edge(ex_in_node, "saturation_lemma8", domain_in_node)
                self.add_edge(edge)

            out_edges = list(self.out_edges[filler_node])
            for out_filler in out_edges:
                if out_filler.owl_class is None:
                    continue
                if out_filler.is_owl_nothing():
                    continue
                    edge = Edge(node, "saturation_lemma8", bot_node)
                    self.add_edge(edge)
                else:
                    ex_out_class = adapter.create_object_some_values_from(node.owl_class.getProperty(), out_filler.owl_class)
                    ex_out_node = Node(owl_class = ex_out_class)
                    edge = Edge(node, "saturation_lemma8", ex_out_node)
                    self.add_edge(edge)

                    domain_out_node = Node(owl_class=ex_out_class, relation = property_, domain = True)
                    edge = Edge(domain_node, "saturation_lemma8", domain_out_node)
                    self.add_edge(edge)
                    edge = Edge(domain_out_node, "saturation_lemma8", ex_out_node)
                    self.add_edge(edge)
                    edge = Edge(ex_out_node, "saturation_lemma8", domain_out_node)
                    self.add_edge(edge)

        
    def as_nx(self):
        logging.debug("Converting to networkx")
        G = nx.DiGraph()
        G.add_nodes_from(list(self.node_to_id.values()))
        for edge in self.as_edgelist():
            src, dst = edge
                            
            if src.in_object_category() and dst.in_object_category():
                if src == bot_node or dst == top_node:
                    continue
                G.add_edge(self.node_to_id[src], self.node_to_id[dst])
        logging.debug("Done converting to networkx")
        return G
                
    def transitive_closure(self):
        logging.debug("Computing transitive closure")
        G = self.as_nx()
        G = nx.transitive_closure(G)
        logging.debug("Done computing transitive closure in NetworkX")
        for src, dst in tqdm(G.edges(), desc="Adding transitive closure edges to graph"):
            src = self.id_to_node[src]
            dst = self.id_to_node[dst]
            self.add_edge(Edge(src, "transitive_closure", dst))
        logging.debug("Done computing transitive closure")

                
    def saturate(self, def_6 = True, lemma_6 = True, def_7 = True, lemma_8 = True):
        """
        This function saturates the graph using the rules defined in [brieulle2022]_.

        :param def_6: If ``True``, apply definition 6. Default is ``True``.
        :type def_6: bool, optional
        :param lemma_6: If ``True``, apply lemma 6. Default is ``True``.
        :type lemma_6: bool, optional
        :param def_7: If ``True``, apply definition 7. Default is ``True``.
        :type def_7: bool, optional
        :param lemma_8: If ``True``, apply lemma 8. Default is ``True``.
        :type lemma_8: bool, optional
        """

        if def_6:
            self._definition_6()

        if lemma_6:
            self._lemma_6()

        if def_7:
            self._definition_7()

        if lemma_8:
            self._lemma_8()

    def is_unsatisfiable(self, node):
        if not isinstance(node, Node):
            raise TypeError("node must be of type Node")
        if node not in self.nodes:
            raise ValueError("Node is not in graph")
        out_edges = self.out_edges[node]
        if bot_node in out_edges:
            return True
        else:
            return False

    def get_unsatisfiable_nodes(self):

        def is_trivial_unsat(node):
            trivial = False
            if node.domain or node.codomain or node.in_relation_category():
                return trivial

            owl_class = node.owl_class
            if owl_class.getClassExpressionType() == CT.OBJECT_INTERSECTION_OF:
                ops = owl_class.getOperandsAsList()
                if len(ops) == 2:
                    op1, op2 = tuple(ops)
                    if op2.getClassExpressionType() == CT.OBJECT_COMPLEMENT_OF:
                        op2 = op2.getOperand()
                        if op1.equals(op2):
                            trivial = True
                    if op1.getClassExpressionType() == CT.OBJECT_COMPLEMENT_OF:
                        op1 = op1.getOperand()
                        if op2.equals(op1):
                            trivial = True
            return trivial
        
        unsat = set()
        for node in self._in_edges[bot_node]:
            if is_trivial_unsat(node):
                continue
            unsat.add(node)
            
        return unsat


    def write_to_file(self, outfile):
        with open(outfile, "w") as f:
            edges = self.as_str_edgelist()
            for src, rel, dst in tqdm(edges, desc="Writing to file"):
                f.write(f"{src}\t{rel}\t{dst}\n")

[docs] @versionadded(version="0.3.0") class CategoricalProjector(ProjectionModel): """ Implementation of projection rules defined in [zhapa2023]_. This class implements the projection of ALC axioms into a graph using categorical diagrams. Saturation steps can be applied following [brieulle2022]_. :param saturation_steps: Number of saturation steps of the graph/category. :type saturation_steps: int :param transitive_closure: If ``True``, every saturation step computes the transitive edges. :type transitive_closure: bool :param def_6: If ``True``, apply definition 6. Default is ``True``. :type def_6: bool, optional :param lemma_6: If ``True``, apply lemma 6. Default is ``True``. :type lemma_6: bool, optional :param def_7: If ``True``, apply definition 7. Default is ``True``. :type def_7: bool, optional :param lemma_8: If ``True``, apply lemma 8. Default is ``True``. :type lemma_8: bool, optional """ def __init__(self, output_type, saturation_steps = 0, transitive_closure = False, def_6 = True, lemma_6 = True, def_7 = True, lemma_8 = True): if not isinstance(output_type, str): raise TypeError(msg.type_error("output_type", "str", type(output_type))) if not isinstance(saturation_steps, int): raise TypeError(msg.type_error("saturation_steps", "int", type(saturation_steps), optional=True)) if saturation_steps < 0: raise ValueError("Optional parameter saturation_steps must be non-negative") if not isinstance(transitive_closure, bool): raise TypeError(msg.type_error("transitive_closure", "bool", type(transitive_closure), optional=True)) if not isinstance(def_6, bool): raise TypeError(msg.type_error("def_6", "bool", type(def_6), optional=True)) if not isinstance(lemma_6, bool): raise TypeError(msg.type_error("lemma_6", "bool", type(lemma_6), optional=True)) if not isinstance(def_7, bool): raise TypeError(msg.type_error("def_7", "bool", type(def_7), optional=True)) if not isinstance(lemma_8, bool): raise TypeError(msg.type_error("lemma_8", "bool", type(lemma_8), optional=True)) if not output_type in ["str", "owl"]: raise ValueError("Invalid output type. Must be 'str' or 'owl'") self.output_type = output_type self.saturation_steps = saturation_steps self.transitive_closure = transitive_closure self.def_6 = def_6 self.lemma_6 = lemma_6 self.def_7 = def_7 self.lemma_8 = lemma_8 self.adapter = OWLAPIAdapter() self.ont_manager = self.adapter.owl_manager self.data_factory = self.adapter.data_factory self.graph = Graph()
[docs] def project(self, ontology): r"""Generates the projection of the ontology. :param ontology: The ontology to be processed. :type ontology: :class:`org.semanticweb.owlapi.model.OWLOntology` """ if not isinstance(ontology, OWLOntology): raise TypeError( "Parameter ontology must be of type org.semanticweb.owlapi.model.OWLOntology") all_classes = ontology.getClassesInSignature() #get class assertion axioms abox_edges = [] for cls in all_classes: cls_str = str(cls.toStringID()) cls_assertions = list(ontology.getClassAssertionAxioms(cls)) for axiom in cls_assertions: ind = str(axiom.getIndividual().toStringID()) abox_edges.append((ind, cls_str)) self.graph = Graph(abox_edges = abox_edges) for cls in tqdm(all_classes, desc="Adding nodes to graph"): self.graph.add_node(Node(owl_class = cls)) all_axioms = ontology.getAxioms(True) for axiom in tqdm(all_axioms, total = len(all_axioms), desc="Processing axioms"): self.graph.add_all_edges(*list(self.process_axiom(axiom))) if self.saturation_steps > 0: for i in range(self.saturation_steps): self.graph.saturate(def_6 = self.def_6, lemma_6 = self.lemma_6, def_7 = self.def_7, lemma_8 = self.lemma_8) if self.transitive_closure: self.graph.transitive_closure() if self.output_type == "str": return self.graph.as_str_edgelist() elif self.output_type == "owl": return self.graph.as_edges() else: raise ValueError("Invalid output type. Must be 'str' or 'owl'")
[docs] def process_axiom(self, axiom): """Process an OWLAxiom and return a list of edges. :param axiom: Axiom to be parsed. :type axiom: :class:`org.semanticweb.owlapi.model.OWLAxiom` """ axiom_type = axiom.getAxiomType() if axiom_type == AxiomType.SUBCLASS_OF: return self._process_subclassof(axiom) elif axiom_type == AxiomType.EQUIVALENT_CLASSES: return self._process_equivalent_classes(axiom) elif axiom_type == AxiomType.DISJOINT_CLASSES: return self._process_disjointness(axiom) elif axiom_type == AxiomType.CLASS_ASSERTION: return self._process_class_assertion(axiom) elif axiom_type == AxiomType.OBJECT_PROPERTY_ASSERTION: return self._process_object_property_assertion(axiom) elif axiom_type in IGNORED_AXIOM_TYPES: #Ignore these types of axioms return [] else: print(f"process_axiom: Unknown axiom type: {axiom_type}") return []
def _process_equivalent_classes(self, axiom): """Process an EquivalentClasses axiom and return a list of edges.""" subclass_axioms = axiom.asOWLSubClassOfAxioms() edges = set() for subclass_axiom in subclass_axioms: edges |= self._process_subclassof(subclass_axiom) if edges == set(): print(f"No edges found for EquivalentClasses axiom: {axiom}") return edges def _process_disjointness(self, axiom): """Process a disjointness axiom""" subclass_axioms = axiom.asOWLSubClassOfAxioms() edges = set() for subclass_axiom in subclass_axioms: edges |= self._process_subclassof(subclass_axiom) return edges def _process_class_assertion(self, axiom): """Process a class assertion axiom""" individual = axiom.getIndividual() class_expression = axiom.getClassExpression() node, edges = self._process_expression_and_get_complex_node(class_expression) ind_as_class = adapter.create_class(str(individual.toStringID())) ind_node = Node(owl_class = ind_as_class, is_individual=True) edges.add(self._assertion_arrow(ind_node, node)) return edges def _process_object_property_assertion(self, axiom): """Process an object property assertion axiom""" subject = axiom.getSubject() object_ = axiom.getObject() property_ = axiom.getProperty() subject_as_class = adapter.create_class(str(subject.toStringID())) object_as_class = adapter.create_class(str(object_.toStringID())) subject_node = Node(owl_class = subject_as_class, is_individual=True) object_node = Node(owl_class = object_as_class, is_individual=True) domain_node = Node(relation=property_, domain=True) codomain_node = Node(relation=property_, codomain=True) some_values_from = adapter.create_object_some_values_from(property_, object_as_class) edges = set() edges.add(self._subsumption_arrow(subject_node, domain_node)) edges.add(self._subsumption_arrow(object_node, codomain_node)) edges.add(self._subsumption_arrow(subject_node, some_values_from)) return edges def _process_subclassof(self, axiom): """Process a SubClassOf axiom and return a list of edges.""" sub_class = axiom.getSubClass() super_class = axiom.getSuperClass() sub_edges = set() super_edges = set() sub_node, sub_edges = self._process_expression_and_get_complex_node(sub_class) #print(axiom) super_node, super_edges = self._process_expression_and_get_complex_node(super_class) if (sub_node is None) or (super_node is None): return set() edges = set() edges.add(self._subsumption_arrow(sub_node, super_node)) edges |= sub_edges edges |= super_edges not_sub_class = self.adapter.create_complement_of(sub_class) union = self.adapter.create_object_union_of(not_sub_class, super_class) union_complex_node, union_edges = self._process_expression_and_get_complex_node(union) edges |= union_edges edges.add(self._subsumption_arrow(top_node, union_complex_node)) return edges def _process_expression_and_get_complex_node(self, expression): expr_type = expression.getClassExpressionType() edges = set() if expr_type == CT.OWL_CLASS: return Node(owl_class=expression), edges if expr_type == CT.OBJECT_INTERSECTION_OF: prod_complex_node = expression operands = expression.getOperandsAsList() for op in operands: op_complex_node, op_edges = self._process_expression_and_get_complex_node(op) if op_complex_node is None: return None, set() edges |= op_edges edges.add(self._product_limit_arrow(prod_complex_node, op_complex_node)) #TODO: add arrows to fulfill distributivity return prod_complex_node, edges elif expr_type == CT.OBJECT_UNION_OF: coprod_complex_node = expression operands = expression.getOperandsAsList() for op in operands: op_complex_node, op_edges = self._process_expression_and_get_complex_node(op) if op_edges == None: return None, set() edges |= op_edges edges.add(self._coproduct_limit_arrow(op_complex_node, coprod_complex_node)) return coprod_complex_node, edges elif expr_type == CT.OBJECT_SOME_VALUES_FROM: existential_complex_node = expression property_ = expression.getProperty() if not isinstance(property_, OWLObjectInverseOf): if property_.getEntityType() != EntityType.OBJECT_PROPERTY: return None, set() else: if property_.isNamed(): property_ = property_.asOWLObject() else: return None, set() filler = expression.getFiller() filler_info = self._process_expression_and_get_complex_node(filler) filler_node, filler_edges = filler_info if filler_node is None: return None, set() edges |= filler_edges rel_exists_r_c = Node(owl_class = expression, relation=property_) codomain_rel_exists_r_c = Node(owl_class=expression, relation=property_, codomain=True) domain_rel_exists_r_c = Node(owl_class=expression, relation=property_, domain=True) edges.add(Edge(rel_exists_r_c, "http://general_arrow", Node(relation=property_))) edges.add(Edge(codomain_rel_exists_r_c, "http://general_arrow", Node(owl_class=filler))) exist_node = Node(owl_class=existential_complex_node) edges.add(Edge(domain_rel_exists_r_c, "http://general_arrow", exist_node)) edges.add(Edge(exist_node, "http://general_arrow", domain_rel_exists_r_c)) return existential_complex_node, edges elif expr_type == CT.OBJECT_ALL_VALUES_FROM: universal_complex_node = expression property_ = expression.getProperty() filler = expression.getFiller() not_filler = self.adapter.create_complement_of(filler) rel_not_filler = self.adapter.create_object_some_values_from(property_, not_filler) not_rel_not_filler = self.adapter.create_complement_of(rel_not_filler) not_rel_not_filler_info = self._process_expression_and_get_complex_node(not_rel_not_filler) not_rel_not_filler_node, not_rel_not_filler_edges = not_rel_not_filler_info if not_rel_not_filler_node is None: return None, set() filler_info = self._process_expression_and_get_complex_node(filler) filler_node, filler_edges = filler_info if filler_node is None: return None, set() edges |= not_rel_not_filler_edges edges |= filler_edges edges.add(self._subsumption_arrow(universal_complex_node, not_rel_not_filler_node)) edges.add(self._subsumption_arrow(not_rel_not_filler_node, universal_complex_node)) not_domain_rel_n_filler = Node(owl_class=rel_not_filler, relation=property_, domain=True, negated_domain=True) edges.add(self._subsumption_arrow(not_domain_rel_n_filler, universal_complex_node)) edges.add(self._subsumption_arrow(universal_complex_node, not_domain_rel_n_filler)) return universal_complex_node, edges elif expr_type == CT.OBJECT_COMPLEMENT_OF: negation_complex_node = expression operand = expression.getOperand() operand_info = self._process_expression_and_get_complex_node(operand) operand_node, operand_edges = operand_info union = self.adapter.create_object_union_of(expression, operand) union = Node(owl_class = union) intersection = self.adapter.create_object_intersection_of(expression, operand) intersection = Node(owl_class=intersection) edges |= operand_edges edges.add(Edge(intersection, "http://general_arrow", bot_node)) edges.add(Edge(top_node, "http://general_arrow", union)) return negation_complex_node, edges elif expr_type in IGNORED_EXPRESSION_TYPES: return None, set() else: print("process expression and get complex node: Unknown super class type: {}".format(expression)) ####################### MORPHISMS ########################### # decorator that transforms params as Node def _node_params(func): def wrapper(self, src, dst): if not isinstance(src, Node): src = Node(owl_class=src) if not isinstance(dst, Node): dst = Node(owl_class=dst) return func(self, src, dst) return wrapper @_node_params def _assertion_arrow(self, src, dst): rel = "http://type_arrow" return Edge(src, rel, dst) @_node_params def _subsumption_arrow(self, src, dst): rel = "http://subsumption_arrow" return Edge(src, rel, dst) @_node_params def _coproduct_limit_arrow(self, src, dst): rel = "http://injects" return Edge(src, rel, dst) @_node_params def _product_limit_arrow(self, src, dst): rel = "http://projects" return Edge(src, "http://projects", dst)
def add_extra_existential_axioms(ontology): adapter = OWLAPIAdapter() manager = adapter.owl_manager classes = ontology.getClassesInSignature() roles = ontology.getObjectPropertiesInSignature() bot_class = adapter.create_class(BOT) top_class = adapter.create_class(TOP) axioms = HashSet() # noqa: F821 — Java class injected by JPype at runtime for role in roles: for cls in classes: existential = adapter.create_object_some_values_from(role, cls) axiom1 = adapter.create_subclass_of(existential, top_class) axiom2 = adapter.create_subclass_of(bot_class, existential) axioms.add(axiom1) axioms.add(axiom2) print(f"Axioms before: {len(ontology.getAxioms())}") manager.addAxioms(ontology, axioms) print(f"Axioms after: {len(ontology.getAxioms())}")