Source code for skl2onnx.common._topology

# SPDX-License-Identifier: Apache-2.0


import re
import warnings
import pprint
from logging import getLogger
from collections import OrderedDict
import numpy as np
from onnx import onnx_pb as onnx_proto
from onnx.helper import make_graph, make_model, make_tensor_value_info
from onnxconverter_common.data_types import (  # noqa
    DataType,
    TensorType,
    FloatType,
    Int64Type,
    StringType,
    DictionaryType,
    FloatTensorType,  # noqa
    Int64TensorType,
    SequenceType,  # noqa
    StringTensorType,
    DoubleTensorType,
    Int32TensorType,
    BooleanTensorType,
    DoubleTensorType,
)

try:
    from onnxconverter_common.data_types import Int8TensorType, UInt8TensorType
except ImportError:
    Int8TensorType = None
    UInt8TensorType = None
from ..proto import get_opset_number_from_onnx, get_latest_tested_opset_version
from . import _registration
from . import utils
from .exceptions import MissingShapeCalculator, MissingConverter
from ._container import ModelComponentContainer, _build_options
from .onnx_optimisation_identity import onnx_remove_node_identity

type_fct = type


def _default_OPSET_TO_IR_VERSION():
    return {
        1: 3,
        2: 3,
        3: 3,
        4: 3,
        5: 3,
        6: 3,
        7: 3,
        8: 4,
        9: 4,
        10: 5,
        11: 6,
        12: 7,
        13: 7,
        14: 7,
        15: 8,
        16: 8,
        17: 8,
        18: 8,
        19: 9,
    }


try:
    from onnxconverter_common.topology import OPSET_TO_IR_VERSION

    assert OPSET_TO_IR_VERSION[18] is not None
except (ImportError, KeyError):
    OPSET_TO_IR_VERSION = _default_OPSET_TO_IR_VERSION()

OPSET_ML_TO_OPSET = {1: 11, 2: 15, 3: 18}

logger = getLogger("skl2onnx")


def get_default_opset_for_domain(domain):
    """
    Returns the associated for a domain given the main opset.
    """
    from .. import __max_supported_opset__ as main_opset

    if domain == "":
        return main_opset
    if domain == "ai.onnx.ml":
        if main_opset >= 16:
            return 3
        if main_opset < 6:
            return 1
        return 2
    if domain == "ai.onnx.training":
        return 1
    return None


[docs]class Variable: """ Defines a variable which holds any data defined from *ONNX* types. """ _UNIQUE_NUMBER_ = 0 def __init__(self, raw_name, onnx_name, scope, type=None): """ :param raw_name: A string indicating the variable's name in the original model. Usually, it's the seed string used to created its ONNX name (i.e., the field *onnx_name* below). :param onnx_name: A string indicating the variable's name in the converted model :param scope: A string. It's the name of the scope where this variable is declared :param type: A type object defined in .common.data_types.py; e.g., FloatTensorType """ if not isinstance(raw_name, str): raise TypeError("raw_name must be a string not '%s'." % raw_name.__class__) if type is not None and not hasattr(type, "shape"): raise TypeError( "Unexpected type for variable raw_name=%r, type=%r." % (raw_name, type) ) if not isinstance(onnx_name, str) or "(" in onnx_name: if onnx_name.startswith("u(") and onnx_name[-1] == ")": onnx_name0 = onnx_name if scope is None: onnx_name = "UU%03dUU" % Variable._UNIQUE_NUMBER_ Variable._UNIQUE_NUMBER_ += 1 else: onnx_name = scope.get_unique_variable_name("U") logger.debug( "[Var] rename raw_name=%r, onnx_name=%r into %r", raw_name, onnx_name0, onnx_name, ) else: raise TypeError("onnx_name must be a string not %r." % onnx_name) if type is not None: shape = type.shape if shape is not None: not_none = [v for v in shape if v is not None] if len(not_none) and min(not_none) == 0: raise RuntimeError( "A variable cannot be empty, raw_name=%r, " "onnx_name=%r, shape=%r, type=%r." % (raw_name, onnx_name, shape, type) ) self._raw_name = raw_name self._onnx_name = onnx_name self._scope = scope self._type = type self._parent = None # The following fields are bool variables used in parsing and # compiling stages self._is_fed = None self._is_root = None self._is_leaf = None if self.type is not None and not isinstance(self.type, DataType): raise TypeError("shape must be a DataType not {}.".format(self.type)) if isinstance(self.type, TensorType): shape = self.type.shape if not isinstance(shape, (list, tuple)): try: shape = list(shape) except TypeError: raise TypeError( "shape must be a tuple or a list not " "{}.".format(type_fct(shape)) ) for dim in shape: if dim is None: continue if not isinstance(dim, (int, np.int32, np.int64, np.intc)): raise TypeError( "shape must contains integers not %r (type=%r)." "" % (dim, dim.__class__) ) logger.debug("[Var] +%s", self) # links to operators using those variables self.operators_outputs_ = [] self.operators_inputs_ = [] self._check() def _check(self): if self.type is not None and self.type.shape is not None: for k in self.type.shape: if k is None: continue if not isinstance(k, (int, np.integer)): raise ValueError( "Unexpected type %r for shape %r." "" % (type(k), self) ) @property def raw_name(self): return self._raw_name @property def onnx_name(self): return self._onnx_name @property def scope(self): return self._scope @property def type(self): return self._type @property def is_fed(self): return self._is_fed @property def is_root(self): return self._is_root @property def is_leaf(self): return self._is_leaf def init_status(self, is_fed=None, is_root=None, is_leaf=None): if is_fed is not None and is_fed != self.is_fed: logger.debug( "[Var] update is_fed=%r for %r, parent=%r", is_fed, self, self._parent ) self._is_fed = is_fed if is_root is not None and is_root != self.is_root: logger.debug("[Var] update is_root=%r for %r", is_root, self) self._is_root = is_root if is_leaf is not None and is_leaf != self.is_leaf: logger.debug("[Var] update is_leaf=%r for %r", is_leaf, self) self._is_leaf = is_leaf def __setattr__(self, name, value): if name == "type": self.set_type(value) elif name == "onnx_name": raise AttributeError("You must use method set_onnx_name.") elif name in {"is_fed", "is_root", "is_leaf"}: raise AttributeError("You must use method init_status.") elif name in {"scope", "raw_name"}: raise AttributeError("scope or raw_name cannot be changed.") self.__dict__[name] = value def set_type(self, new_type): if ( new_type is None or isinstance(new_type, (str, Variable)) or not hasattr(new_type, "shape") ): raise TypeError( "Unexpected new type for variable %r, new_type=%r." % (self, new_type) ) logger.debug("[Var] update type for %r", self) self._type = new_type self._check() def set_onnx_name(self, onnx_name): if onnx_name != self._onnx_name: logger.debug( "[Var] update onnx_name, from %r to %r in %r", self.onnx_name, onnx_name, self, ) if self.scope is not None and not isinstance(self.scope, str): self.scope.rename_onnx_name(self._onnx_name, onnx_name) self._onnx_name = onnx_name def set_parent(self, operator): if self._parent is not None: raise RuntimeError( "This variable is already the output of operator %r. " "It cannot be the output of %r." % (self._parent, operator) ) logger.debug("[Var] set parent for %r, parent=%r", self, operator) self._parent = operator def get_first_dimension(self): """ Returns the first dimension (batch dimension) or None if not specified (shape is empty). """ if self.type is None or self.type.shape is None or len(self.type.shape) == 0: return None return self.type.shape[0] def get_second_dimension(self): if self.type is None or self.type.shape is None or len(self.type.shape) < 2: return None return self.type.shape[1] @property def full_name(self): """ Return a globally unique variable ID """ return self.onnx_name def __repr__(self): return "Variable('{0}', '{1}', type={2})".format( self.raw_name, self.onnx_name, self.type ) @staticmethod def from_pb(obj): """ Creates a data type from a protobuf object. """ def get_dim(d): r = d.dim_value if "dim_param" in str(d): return None if r == 0: # dim_value is 0 when it is 0 or undefined return 0 if "0" in str(d) else None return r def get_shape(tt): return [get_dim(tt.shape.dim[i]) for i in range(len(tt.shape.dim))] if hasattr(obj, "extend"): return [Variable.from_pb(o) for o in obj] name = obj.name if obj.type.tensor_type: tt = obj.type.tensor_type elem = tt.elem_type shape = get_shape(tt) if elem == onnx_proto.TensorProto.FLOAT: ty = FloatTensorType(shape) elif elem == onnx_proto.TensorProto.BOOL: ty = BooleanTensorType(shape) elif elem == onnx_proto.TensorProto.DOUBLE: ty = DoubleTensorType(shape) elif elem == onnx_proto.TensorProto.STRING: ty = StringTensorType(shape) elif elem == onnx_proto.TensorProto.INT64: ty = Int64TensorType(shape) elif elem == onnx_proto.TensorProto.INT32: ty = Int32TensorType(shape) elif UInt8TensorType is not None and elem == onnx_proto.TensorProto.UINT8: ty = UInt8TensorType(shape) elif Int8TensorType is not None and elem == onnx_proto.TensorProto.INT8: ty = Int8TensorType(shape) elif elem == 0: ty = FloatTensorType(shape) else: raise NotImplementedError( "Unsupported type '{}' (elem_type={}).".format( type(obj.type.tensor_type), elem ) ) else: raise NotImplementedError( "Unsupported type '{}' as " "a string ({}).".format(type(obj), obj) ) return Variable(name, name, None, ty) def __iter__(self): "Enables expression such as `a,b = self`." yield self.onnx_name yield self.type def __getitem__(self, index): if index == 0: return self.onnx_name if index == 1: return self.type raise IndexError("Unreachable element at index %d." % index) def add_operator(self, op, in_or_out): "Add a link to an operator, True for output, False for input." if in_or_out: self.operators_outputs_.append(op) else: self.operators_inputs_.append(op) def check_compatible_type(self, other_type): def empty_shape(shape): return shape is None or len(shape) == 0 if self.type is None: if other_type is None: return elif other_type is not None: if isinstance(self.type, type(other_type)): if self.type.shape == other_type.shape: return if empty_shape(other_type.shape): return raise TypeError( "Incompatible type for variable %r and type %r." % (self, other_type) )
class VariableStr(Variable): """ Defines a variable a string. This should be avoided. """ def __init__(self, name, scope=None, type=None): Variable.__init__(self, name, name, scope=scope, type=type) @property def raw_name(self): return self._raw_name @property def onnx_name(self): if self._onnx_name.startswith("u("): raise RuntimeError( "Variable should be renamed as onnx_name=%r." "" % self._onnx_name ) return self._onnx_name
[docs]class Operator: """ Defines an operator available in *ONNX*. """ class OperatorList(list): def __init__(self, parent, kind): super(Operator.OperatorList, self).__init__() self.parent = parent self.kind = kind def __eq__(self, second): raise NotImplementedError("Operator equal not implemented and not needed.") def append(self, v): if not isinstance(v, Variable): raise TypeError( "Input and output must be of type Variable not %r." "" % type(v) ) if self.kind == "Out": v.set_parent(self.parent) super(Operator.OperatorList, self).append(v) logger.debug("[Op] add %s %r to %r", self.kind, v, self.parent) if self.kind == "In": v.add_operator(self.parent, False) elif self.kind == "Out": v.add_operator(self.parent, True) else: raise RuntimeError("Unexpected value for kind=%r." % self.kind) def extend(self, vs): for v in vs: self.append(v) def __getitem__(self, i): v = list.__getitem__(self, i) if isinstance(i, int) and not isinstance(v, Variable): raise TypeError("Element %d must be a Variable not %r." % (i, type(v))) return v def __setitem__(self, i, v): raise LookupError("Setter should not be used to modify an element.") def set_element(self, i, v): "Updates element i." if not isinstance(v, Variable): raise TypeError("Value v must be a Variable not %r." % type(v)) logger.debug( "[Op] %s-change element %d from %r to %r in %r", self.kind, i, self[i], v, self.parent, ) list.__setitem__(self, i, v) def to_string(self): names = [] for o in self: if hasattr(o, "onnx_name"): names.append(o.onnx_name) else: names.append('"%s"' % str(o)) return ",".join(names) def __init__(self, onnx_name, scope, type, raw_operator, target_opset, scope_inst): """ :param onnx_name: A unique ID, which is a string :param scope: The name of the scope where this operator is declared. It's a string. :param type: A object which uniquely characterizes the type of this operator. For example, it can be a string, pooling, if this operator is associated with a CoreML pooling layer. :param raw_operator: The original operator which defines this operator; for example, a scikit-learn Imputer and a CoreML Normalizer. :param target_opset: The target opset number for the converted model. :param scope_inst: :class:`Scope` instance the operator belongs to """ if isinstance(raw_operator, str): raise RuntimeError( "Parameter raw_operator must be an object not " "a string '{0}'.".format(raw_operator) ) # operator name in the converted model, if raw_operator # is not None, output_shapes can be guessed # from the raw model. Otherwise, it can be guessed # from the input shapes. self.onnx_name = onnx_name self.scope = scope self.type = type self.raw_operator = raw_operator self.inputs = Operator.OperatorList(self, "In") self.outputs = Operator.OperatorList(self, "Out") self._is_evaluated = None self.target_opset = target_opset self.scope_inst = scope_inst logger.debug("[Op] +%r", self) def new_raw_operator(self, raw_operator, alias): """ Returns a shallow copy of this operator, changes the raw_operator but keeps the same inputs and outputs. """ op = Operator( self.onnx_name, self.scope, alias, raw_operator, self.target_opset, self.scope_inst, ) op.inputs = self.inputs op.outputs = self.outputs return op def __repr__(self): try: textop = repr(self.raw_operator) except AttributeError: textop = "MISSING OP" except KeyError: # The line above fails for python 3.7 textop = type(self.raw_operator) if isinstance(textop, str) and "\n" in textop: textop = textop.replace("\n", "").replace(" ", "") return ( "Operator(type='{0}', onnx_name='{1}', inputs='{2}', " "outputs='{3}', raw_operator={4})".format( self.type, self.onnx_name, self.inputs.to_string(), self.outputs.to_string(), textop, ) ) def __setattr__(self, name, value): if name in ("inputs", "outputs"): if isinstance(value, list) and not isinstance(value, Operator.OperatorList): if name == "inputs": self.inputs = Operator.OperatorList(self, "In") self.inputs.extend(value) return if name == "outputs": self.outputs = Operator.OperatorList(self, "Out") self.outputs.extend(value) return if not isinstance(value, Operator.OperatorList): raise TypeError( "inputs or outputs must be of type Operator.OperatorList." ) ioo = name == "outputs" for v in value: v.add_operator(self, ioo) self.__dict__[name] = value @property def is_evaluated(self): return self._is_evaluated def init_status(self, is_evaluated=None): if is_evaluated is not None and is_evaluated != self.is_evaluated: logger.debug("[Op] update is_evaluated=%r for %r", is_evaluated, self) self._is_evaluated = is_evaluated @property def full_name(self): """ Return a globally unique operator ID """ return self.onnx_name @property def input_full_names(self): """ Return all input variables' names """ return [variable.full_name for variable in self.inputs] @property def output_full_names(self): """ Return all output variables' names """ return [variable.full_name for variable in self.outputs] @property def original_operator(self): """ Return the original operator/layer """ return self.raw_operator def infer_types(self): # Invoke a core inference function if self.type is None: raise MissingShapeCalculator( "Unable to find a shape calculator for type '{}'.".format( type(self.raw_operator) ) ) try: shape_calc = _registration.get_shape_calculator(self.type) except ValueError: raise MissingShapeCalculator( "Unable to find a shape calculator for alias '{}' " "and type '{}'.".format(self.type, type(self.raw_operator)) ) if shape_calc is None: raise MissingShapeCalculator( "Unexpected shape calculator for alias '{}' " "and type '{}'.".format(self.type, type(self.raw_operator)) ) logger.debug( "[Shape-a] %r fed %r - %r", self, "".join(str(i.is_fed) for i in self.inputs), "".join(str(i.is_fed) for i in self.outputs), ) shape_calc(self) logger.debug( "[Shape-b] %r inputs=%r - outputs=%r", self, self.inputs, self.outputs )
[docs]class Scope: """ Every node of an *ONNX* graph must be unique. This class holds the list of existing name for every node already defined in graph. It also provides functions to create a unique unused name. """ def __init__( self, name, target_opset=None, custom_shape_calculators=None, options=None, registered_models=None, naming=None, ): """ :param name: A string, the unique ID of this scope in a Topology object :param target_opset: The target opset number for the converted model. :param custom_conversion_functions: a dictionary for specifying the user customized conversion function :param custom_shape_calculators: a dictionary for specifying the user customized shape calculator :param options: see :ref:`l-conv-options` :param naming: the user may want to change the way intermediate are named, this parameter can be a string (a prefix) or a function, which signature is the following: `get_name(name, existing_names)`, the library will then check this name is unique and modify it if not :param registered_models: registered models .. versionchanged:: 1.10.0 Parameter *naming* was added. """ self.name = name self.onnx_variable_names = set() self.onnx_operator_names = set() self.target_opset = target_opset self.custom_shape_calculators = custom_shape_calculators # An one-to-many map from raw variable name to ONNX variable # names. It looks like # (key, value) = (raw_name, [onnx_name, onnx_name1, onnx_name2, ..., onnx_nameN]) # noqa # The last name may hide all other names in this scope. self.variable_name_mapping = {} # A map of local variables defined in this scope. # (key, value) = (onnx_name, variable) self.variables = OrderedDict() self.input_variables = [] self.output_variables = [] # A map of local operators defined in this scope. # (key, value) = (onnx_name, operator) self.operators = {} # Additional options given to converters. self.options = options # Registered models self.registered_models = registered_models self.naming = naming if naming is None: self._naming = Topology._generate_unique_name elif isinstance(naming, str): self._naming = lambda seed, names: Topology._generate_unique_name( self.naming + seed, names ) elif callable(self.naming): self._naming = lambda seed, names: Topology._generate_unique_name( self.naming(seed, names), names ) else: raise TypeError("Unexpected type for parameter naming: %r." % type(naming)) def get(self, var_name, default_value): "Returns variable with 'name' or default value is not found." return self.variables.get(var_name, default_value) def has_variable_name(self, name): """ Tells if a variable is already registered. """ return name in self.onnx_variable_names def get_shape_calculator(self, model_type): """ Returns the shape calculator for the given model type. :param model_type: model type such as *LogisticRegression* :return: alias or None if not found """ return self.custom_shape_calculators.get(model_type, None)
[docs] def get_unique_variable_name(self, seed, rename=True): """ Creates a unique variable ID based on the given seed. """ if not isinstance(seed, str): raise TypeError( "Parameter seed must be a string not {}." "".format(type(seed)) ) if rename: name = self._naming(seed, self.onnx_variable_names) else: name = Topology._generate_unique_name(seed, self.onnx_variable_names) return name
[docs] def get_unique_operator_name(self, seed): """ Creates a unique operator ID based on the given seed. """ return self._naming(seed, self.onnx_operator_names)
def declare_local_variable( self, raw_name, type=None, prepend=False, missing_type=False, rename=True ): """ This function may create a new variable in this scope. If *raw_name* has been used to create other variables, the new variable will hide all other variables created using *raw_name*. """ if type is None and not missing_type: raise RuntimeError("Unknown type for %r (type=%r)." % (raw_name, type)) # Get unique ID for the new variable onnx_name = self.get_unique_variable_name(raw_name, rename=rename) # Create the variable variable = Variable(raw_name, onnx_name, self.name, type) self.register_variable(variable, prepend=prepend) return variable def register_variable(self, var, prepend=False): "Adds a variable to the scope." if var.onnx_name in self.variables: raise RuntimeError( "Variable %r already registered (other=%r)." % (var, self.variables[var.onnx_name]) ) if var.raw_name in self.variable_name_mapping: # Hide existing variables with the same raw_name if not prepend: self.variable_name_mapping[var.raw_name].append(var.onnx_name) else: self.variable_name_mapping[var.raw_name].insert(0, var.onnx_name) else: self.variable_name_mapping[var.raw_name] = [var.onnx_name] self.variables[var.onnx_name] = var def declare_existing_subgraph_name(self, graph_proto): """ Declare all name from a subgraph in order to avoid being picked twice. """ output_name = {o.name for o in graph_proto.output} for node in graph_proto.node: for name in node.output: if name in output_name: continue if self.has_variable_name(name): raise NameError( "Result name %r is already taken (outputs=%r) " "(node=%r)." % (name, output_name, node) ) self.onnx_variable_names.add(name) if node.name in self.onnx_operator_names: raise NameError( "Operator name %r is already taken " "(node=%r)." % (node.name, node) ) self.onnx_operator_names.add(node.name) def rename_onnx_name(self, old_name, new_name): if new_name in self.variables: raise RuntimeError( "Name %r already in variables (%r)." % (new_name, self.variables[new_name]) ) if old_name not in self.variables: raise RuntimeError("Unable to find name %r in variables." % old_name) logger.debug("[Scope] update onnx_name, from %r to %r", old_name, new_name) self.variables[new_name] = self.variables[old_name] del self.variables[old_name] def declare_local_input(self, raw_name, type=None, prepend=False, rename=True): """ Calls `declare_local_variable`. Registers this variable as an input. """ var = self.declare_local_variable( raw_name, type=type, prepend=prepend, rename=rename ) self.input_variables.append(var) return var def declare_local_output( self, raw_name, type=None, prepend=False, missing_type=False ): """ Calls `declare_local_variable`. Registers this variable as an output. """ var = self.declare_local_variable( raw_name, type=type, prepend=prepend, missing_type=missing_type ) self.output_variables.append(var) return var def declare_local_operator(self, type, raw_model=None): """ This function is used to declare new local operator. """ onnx_name = self.get_unique_operator_name(str(type)) operator = Operator( onnx_name, self.name, type, raw_model, self.target_opset, scope_inst=self ) self.operators[onnx_name] = operator return operator def _get_allowed_options(self, model, fail=True): if self.registered_models is not None: if type(model) not in self.registered_models["aliases"]: if fail: raise NotImplementedError( "No registered models, no known allowed options " "for model '{}'.".format(model.__class__.__name__) ) return {} alias = self.registered_models["aliases"][type(model)] conv = self.registered_models["conv"][alias] allowed = conv.get_allowed_options() return allowed raise NotImplementedError( "No registered models, no known allowed options " "for model '{}'.".format(model.__class__.__name__) ) def add_options(self, model_id, options): """ Adds an option, for example, ``add_options(id(clr), {'raw_scores': True})`` tells the converter associated to ``clr`` to use raw score instead of probabilities. :param model_id: class or ``id(instance)`` :param options: dictionary with the new values """ if options is None: return if self.options is None: self.options = {} if model_id not in self.options: self.options[model_id] = None if self.options[model_id] is None: self.options[model_id] = {} self.options[model_id].update(options) def get_options(self, model, default_values=None, fail=True): """ Returns additional options for a model. It first looks by class then by id (``id(model)``). :param model: model being converted :param default_values: default options (it is modified by the function) :param fail: fails if option it not found :return: dictionary """ return _build_options( model, self.options, default_values, self._get_allowed_options(model, fail=fail), fail=fail, ) def replace_raw_operator(self, op1, op2, alias): """ Replaces every raw operator op1 by op2. The function uses `id()` to detect op1. """ for v in self.operators.values(): if id(v.raw_operator) == id(op1): logger.debug( "[Scope] replace %d by %d in %r.", id(v.raw_operator), id(op1), v ) v.raw_operator = op2 v.type = alias
[docs]class Topology: """ Holds instances on :class:`Scope <skl2onnx.common._topology.Scope>` and :class:`SklearnModelContainer <skl2onnx.common._container.SklearnModelContainer>`. These are filled by the converters while a pipeline is being converted. """ def __init__( self, model, default_batch_size=1, initial_types=None, target_opset=None, custom_conversion_functions=None, custom_shape_calculators=None, registered_models=None, ): """ Initializes a *Topology* object, which is an intermediate representation of a computational graph. :param model: RawModelContainer object or one of its derived classes. It contains the original model. :param default_batch_size: batch_size prepend to scalar and array types from CoreML. It's usually 1 or None. :param initial_types: A list providing some types for some root variables. Each element is a tuple of a variable name and a type defined in *data_types.py*. :param custom_conversion_functions: a dictionary for specifying the user customized conversion function :param custom_shape_calculators: a dictionary for specifying the user customized shape calculator :param registered_models: registered models """ self.scopes = [] self.raw_model = model self.scope_names = set() self.initial_types = initial_types if initial_types else list() self.default_batch_size = default_batch_size self.target_opset = target_opset self.custom_conversion_functions = ( custom_conversion_functions if custom_conversion_functions else {} ) self.custom_shape_calculators = ( custom_shape_calculators if custom_shape_calculators else {} ) for k in self.custom_conversion_functions: if not callable(k): raise TypeError( "Keys in custom_conversion_functions must be types not strings." ) for k in self.custom_shape_calculators: if not callable(k): raise TypeError( "Keys in custom_shape_calculators must be types not strings." ) # A map of local overwritten model aliases. self.model_aliases = {} all_model_types = set(self.custom_conversion_functions) | set( self.custom_shape_calculators ) for mtype in all_model_types: alias = "{}_{}".format(mtype.__name__, id(self)) self.model_aliases[mtype] = alias # Registered models if registered_models is None: raise AssertionError() self.registered_models = registered_models @property def scope(self): if len(self.scopes) != 1: raise RuntimeError("Only one scope is allowed not %d." % len(self.scopes)) return self.scopes[0] @staticmethod def _generate_unique_name(seed, existing_names): """ Produce an unique string based on the seed :param seed: a string :param existing_names: a set containing strings which cannot be produced :return: a string similar to the seed """ if seed == "": raise ValueError("Name seed must be a non-empty string.") # Make the seed meet C-style naming convention # Only alphabets and numbers are allowed seed = re.sub("[^\\w+]", "_", seed) # The first symbol cannot be a number if re.match("^[0-9]", seed): seed = "_" + seed # If seed has never been seen, we return it as it is. Otherwise, # we will append an number to make it unique. if seed not in existing_names: existing_names.add(seed) return seed else: i = 1 while seed + str(i) in existing_names: i += 1 new_name = seed + str(i) existing_names.add(new_name) return new_name def get_unique_scope_name(self, seed): return Topology._generate_unique_name(seed, self.scope_names) def declare_scope(self, seed, parent_scopes=None, options=None, naming=None): """ Creates a new :class:`Scope <skl2onnx.common._topology.Scope>` and appends it to the list of existing scopes. """ if len(self.scopes) != 0: raise RuntimeError("Only one scope can be created.") scope = Scope( self.get_unique_scope_name(seed), target_opset=self.target_opset, custom_shape_calculators=self.custom_shape_calculators, options=options, registered_models=self.registered_models, naming=naming, ) # Declare input variables. # They should be the inputs of the scikit-learn # model you want to convert into ONNX. for var_name, initial_type in self.initial_types: scope.declare_local_input(var_name, initial_type, rename=False) self.scopes.append(scope) return scope def unordered_operator_iterator(self): for scope in self.scopes: for operator in scope.operators.values(): yield operator def unordered_variable_iterator(self): for scope in self.scopes: for variable in scope.variables.values(): yield variable def call_converter(self, operator, container, verbose=0): "Calls converter for operator *operator*." mtype = type(operator.raw_operator) if mtype in self.custom_conversion_functions: conv = self.custom_conversion_functions[mtype] elif operator.type in self.custom_conversion_functions: conv = self.custom_conversion_functions[operator.type] elif hasattr(operator.raw_operator, "onnx_converter"): conv = operator.raw_operator.onnx_converter() else: # Convert the selected operator into some ONNX objects and # save them into the container try: conv = _registration.get_converter(operator.type) except ValueError: raise MissingConverter( "Unable to find converter for alias '{}' type " "'{}'. You may raise an issue at " "https://github.com/onnx/sklearn-onnx/issues." "".format(operator.type, type(getattr(operator, "raw_model", None))) ) container.validate_options(operator) if verbose > 0: print("[call_converter] call converter for %r." % operator.type) logger.debug( "[Conv] call %r fed %r - %r", operator, "".join(str(i.is_fed) for i in operator.inputs), "".join(str(i.is_fed) for i in operator.outputs), ) conv(self.scopes[0], operator, container) logger.debug("[Conv] end - %r", operator) def call_shape_calculator(self, operator): "Calls shape_calculator for operator *operator*." mtype = type(operator.raw_operator) if mtype in self.custom_shape_calculators: # overwritten operator. source = "custom" shape_calc = self.custom_shape_calculators[mtype] elif operator.type in self.custom_shape_calculators: source = "custom" shape_calc = self.custom_shape_calculators[operator.type] elif hasattr(operator.raw_operator, "onnx_shape_calculator"): source = "onnx_shape_calculator" shape_calc = operator.raw_operator.onnx_shape_calculator() else: source = "" shape_calc = None if shape_calc is not None: logger.debug( "[Shape1] %r fed %r - %r (source=%r)", operator, ",".join(str(i.is_fed) for i in operator.inputs), ",".join(str(i.is_fed) for i in operator.outputs), source, ) shape_calc(operator) else: logger.debug("[Shape2] call infer_types for %r", operator) operator.infer_types() def _initialize_graph_status_for_traversing(self): """ Initialize the status of all variables and operators before traversing the graph. Only used by convert_operators. """ if len(self.scopes) != 1: raise RuntimeError("Only one scope is allowed not %d." % len(self.scopes)) input_names = set(v.onnx_name for v in self.scopes[0].input_variables) if len(input_names) == 0: raise RuntimeError("No detected inputs.") for variable in self.unordered_variable_iterator(): is_input = variable.onnx_name in input_names variable.init_status(is_fed=is_input) for operator in self.unordered_operator_iterator(): operator.init_status(is_evaluated=False) def _propagate_status(self, operator, container, fed_variables, verbose=0): """ Propagates status *is_fed* based on output variable and node added in the container. """ if verbose > 1: print("[_propagate_status] after op=%r" % operator) vars = {} for node in container.nodes: for i in node.input: if i not in vars: vars[i] = [] vars[i].append(node) if verbose > 1: print( "[_propagate_status] newly fed=%r" % list(v.onnx_name for v in operator.outputs if v.is_fed) ) stack = list(fed_variables) scope = self.scopes[0] while len(stack) > 0: nodes = {} for name in stack: if name not in vars: continue for n in vars[name]: nodes[id(n)] = n stack = [] for node in nodes.values(): if all(fed_variables.get(n, False) for n in node.input): for o in node.output: if o not in fed_variables: if verbose > 1: print("[_propagate_status] add=%r" % o) fed_variables[o] = o stack.append(o) if o in scope.variables: var = scope.variables[o] var.init_status(is_fed=True) if verbose > 1: print("[_propagate_status] fed=%r" % var) def convert_operators(self, container=None, verbose=0): """ Calls all converters and shape_calculator for existing operators. It also processes new operators created by converters. """ def _check_operator_(operator): if not isinstance(operator.inputs, Operator.OperatorList): raise TypeError( "operator.inputs must be a Operator.OperatorList " "not %r." % type(operator.inputs) ) if not isinstance(operator.outputs, Operator.OperatorList): raise TypeError( "operator.outputs must be a Operator.OperatorList " "not %r." % type(operator.outputs) ) if any(not isinstance(i, Variable) for i in operator.inputs): raise TypeError( "One input is not a Variable for operator %r - %r." "" % (type(operator.raw_operator), operator) ) if any(not isinstance(i, Variable) for i in operator.outputs): raise TypeError( "One output is not a Variable for operator %r - %r." "" % (type(operator.raw_operator), operator) ) def _check_variable_in_(variable, operator): idop = id(operator) ids = set(id(op) for op in variable.operators_inputs_) if idop not in ids: raise RuntimeError( "Operator %r not registered in the list of operators " "of %r taking it as an input [\n%s]." % ( operator, variable, "\n".join(map(str, variable.operators_inputs_)), ) ) def _check_variable_out_(variable, operator): if variable.is_fed: add = ["", "--DEBUG-INFO--"] for scope in self.scopes: add.append("---") add.append(pprint.pformat(scope.variable_name_mapping)) add.append("---") for var in scope.variables.values(): add.append( " is_fed=%s %s - n_in=%d n_out=%d" % ( getattr(var, "is_fed", "?"), var, len(var.operators_inputs_), len(var.operators_outputs_), ) ) add.append("---") for op in scope.operators.values(): add.append( " is_evaluated=%s %s" % (getattr(op, "is_evaluated", "?"), op) ) add.append("---") for v in operator.inputs: add.append(" inputs={}".format(v)) for v in operator.outputs: add.append(" outputs={}".format(v)) add.append("--- operator producing this variable--") for op in variable.operators_outputs_: add.append(str(op)) raise RuntimeError( "A variable is already assigned ({}) " "for operator '{}' (name='{}'). " "operator.is_evaluated={}, inputs.is_fed={}, " "outputs.is_fed={}. " "This may still happen if a converter is a " "combination of sub-estimators and one " "of them is producing this output. " "In that case, an identity node must be " "added.{}".format( variable, operator.type, operator.onnx_name, operator.is_evaluated, [v.is_fed for v in operator.inputs], [v.is_fed for v in operator.outputs], "\n".join(add), ) ) if verbose > 0: print("[convert_operators] begin") self._initialize_graph_status_for_traversing() fed_variables = {i.name: i for i in container.initializers} changes = 1 n_iter = 0 while changes > 0: n_iter += 1 changes = 0 ops = list(self.unordered_operator_iterator()) if verbose > 0: print( "[convert_operators] iteration %d - n_vars=%d " "n_ops=%d" % (n_iter, len(fed_variables), len(ops)) ) for operator in ops: _check_operator_(operator) for var in operator.inputs: if var.is_fed: fed_variables[var.onnx_name] = var if ( all(variable.is_fed for variable in operator.inputs) and not operator.is_evaluated ): for variable in operator.inputs: _check_variable_in_(variable, operator) for variable in operator.outputs: _check_variable_out_(variable, operator) self.call_shape_calculator(operator) self.call_converter(operator, container, verbose=verbose) # If an operator contains a sequence of operators, # output variables are not necessarily known at this stage. operator.init_status(is_evaluated=True) for variable in operator.outputs: if all(op.is_evaluated for op in variable.operators_outputs_): variable.init_status(is_fed=True) fed_variables[variable.onnx_name] = variable fed_variables.update( { i.name: i for i in container.initializers if i.name not in fed_variables } ) self._propagate_status( operator, container, fed_variables, verbose=verbose ) # unfed some variables (it happens when a node # shares an output with another node) rem = [] for n, var in fed_variables.items(): if not hasattr(var, "operators_outputs_"): # initializer continue if any(not o.is_evaluated for o in var.operators_outputs_): rem.append(n) for r in rem: v = fed_variables[r] v.init_status(is_fed=False) del fed_variables[v.onnx_name] changes += 1 if verbose > 0: print( "[convert_operators] end iter: %d - n_vars=%d" % (n_iter, len(fed_variables)) ) if verbose > 0: print("[convert_operators] end.") # Last verification. not_evaluated = [] for op in self.unordered_operator_iterator(): if not op.is_evaluated: not_evaluated.append(op) if len(not_evaluated) > 0: rows = ["---VARS---"] for var in self.unordered_variable_iterator(): rows.append( "is_fed=%r is_leaf=%r is_root=%r - %r - n_in=%d n_out=%d" "" % ( var.is_fed, var.is_leaf, var.is_root, var, len(var.operators_inputs_), len(var.operators_outputs_), ) ) rows.append("---OPERATORS---") for op in self.unordered_operator_iterator(): rows.append("is_eval=%r - %r" % (op.is_evaluated, op)) rows.append("---NODES---") for node in container.nodes: rows.append("%s: %r -> %r" % (node.op_type, node.input, node.output)) raise RuntimeError( "Not all operators have been evaluated. A variable name " "is probably misspelled.\n%s" "" % "\n".join(rows) ) # Input and output if len(self.scopes[0].input_variables) > 0: inputs = self.scopes[0].input_variables else: inputs = [v for v in self.unordered_variable_iterator() if v.is_root] for i in inputs: container.add_input(i) outputs = [v for v in self.unordered_variable_iterator() if v.is_leaf] # The function checks that for output variable, # raw_name equal onnx_name. It swaps names if it is not the case. to_swap = [] for out in outputs: if out.raw_name != out.onnx_name: to_swap.append(out) if len(to_swap) != 0: swaped = set() for var in to_swap: if var.raw_name in swaped: continue swaped.add(var.raw_name) if verbose > 1: print( "[convert_operators] %r <-> %r." % (var.raw_name, var.onnx_name) ) old_name = var.onnx_name new_name = var.raw_name try: container.swap_names(old_name, new_name) except NotImplementedError as e: logger.debug( "[Topo] unable to swap %r and %r (%r).", old_name, new_name, e ) continue for v in self.unordered_variable_iterator(): if v.onnx_name == old_name: v.set_onnx_name(new_name) elif v.onnx_name == new_name: v.set_onnx_name(old_name) for o in outputs: container.add_output(o)
[docs]def convert_topology( topology, model_name, doc_string, target_opset, options=None, remove_identity=True, verbose=0, ): """ This function is used to convert our Topology object defined in _parser.py into a ONNX model (type: ModelProto). :param topology: The Topology object we are going to convert :param model_name: GraphProto's name. Let "model" denote the returned model. The string "model_name" would be assigned to "model.graph.name." :param doc_string: A string attached to the produced model :param target_opset: number or dictionary, for example, 7 for ONNX 1.2, and 8 for ONNX 1.3, a dictionary is used to indicate different opset for different domains :param options: see :ref:`l-conv-options` :param remove_identity: removes identity nodes include '1.1.2', '1.2', and so on. :param verbose: displays information while converting :return: a ONNX ModelProto """ if target_opset is None: target_opset = get_latest_tested_opset_version() if isinstance(target_opset, dict): onnx_target_opset = target_opset.get("", get_latest_tested_opset_version()) else: onnx_target_opset = target_opset if onnx_target_opset > get_opset_number_from_onnx(): found = get_opset_number_from_onnx() raise RuntimeError( "Parameter target_opset {} > {} is higher than the " "version of the installed onnx package. See " "https://github.com/onnx/onnx/blob/master/docs/" "Versioning.md#released-versions" ".".format(onnx_target_opset, found) ) if onnx_target_opset > get_latest_tested_opset_version(): warnings.warn( "Parameter target_opset {} > {} is higher than the " "the latest tested version" ".".format(onnx_target_opset, get_latest_tested_opset_version()) ) container = ModelComponentContainer( target_opset, options=options, registered_models=topology.registered_models, white_op=topology.raw_model._white_op, black_op=topology.raw_model._black_op, verbose=verbose, ) # Traverse the graph from roots to leaves # This loop could eventually be parallelized. topology.convert_operators(container=container, verbose=verbose) container.ensure_topological_order() if len(container.inputs) == 0: raise RuntimeError("No detected inputs after conversion.") if len(container.outputs) == 0: raise RuntimeError("No detected outputs after conversion.") if verbose >= 2: print("---NODES---") for node in container.nodes: print( " %s - %s: %r -> %r" % (node.op_type, node.name, node.input, node.output) ) # Create a graph from its main components if container.target_opset_onnx < 9: # When calling ModelComponentContainer's add_initializer(...), # nothing is added into the input list. However, for ONNX target # opset < 9, initializers should also be a part of model's # (GraphProto) inputs. Thus, we create ValueInfoProto objects # from initializers (type: TensorProto) directly and then add # them into model's input list. extra_inputs = [] # ValueInfoProto list of the initializers for tensor in container.initializers: # Sometimes (especially when creating optional input values # such as RNN's initial hidden state), an initializer is also # one of the original model's input, so it has been added into # the container's input list. If this is the case, we need to # skip one iteration to avoid duplicated inputs. if tensor.name in [value_info.name for value_info in container.inputs]: continue # Initializers are always tensors so we can just call # make_tensor_value_info(...). value_info = make_tensor_value_info( tensor.name, tensor.data_type, tensor.dims ) extra_inputs.append(value_info) # Before ONNX opset 9, initializers were needed to be passed in # with inputs. graph = make_graph( container.nodes, model_name, container.inputs + extra_inputs, container.outputs, container.initializers, ) else: # In ONNX opset 9 and above, initializers are included as # operator inputs and therefore do not need to be passed as # extra_inputs. graph = make_graph( container.nodes, model_name, container.inputs, container.outputs, container.initializers, ) # Add extra information related to the graph graph.value_info.extend(container.value_info) # Create model onnx_model = make_model(graph) # Update domain version opv = min( onnx_target_opset, _get_main_opset_version(onnx_model) or onnx_target_opset ) if not _update_domain_version(container, onnx_model, verbose=verbose): # Main opset was not added. Doing it here. op_set = onnx_model.opset_import.add() op_set.domain = "" op_set.version = opv if verbose > 0: print("[convert_topology] +opset: name=%r, version=%s" % ("", opv)) # Add extra information irv = OPSET_TO_IR_VERSION.get(opv, onnx_proto.IR_VERSION) onnx_model.ir_version = irv onnx_model.producer_name = utils.get_producer() onnx_model.producer_version = utils.get_producer_version() onnx_model.domain = utils.get_domain() onnx_model.model_version = utils.get_model_version() onnx_model.doc_string = doc_string # Removes many identity nodes, # the converter may introduct identity nodes # after a zipmap operator and onnx <= 1.7 does not # support that. It does not use onnxconverter-common # as the optimizer only support opset >= 9. if remove_identity: onnx_model = onnx_remove_node_identity(onnx_model) return onnx_model
def _update_domain_version(container, onnx_model, verbose=0): # Merge operator sets for the same domain, the largest version # number would be kept purified_operator_set = dict() for op_domain, op_version in container.node_domain_version_pair_sets: if op_domain not in purified_operator_set: purified_operator_set[op_domain] = op_version else: purified_operator_set[op_domain] = max( purified_operator_set[op_domain], op_version ) # Fill operator sets i = 0 for op_domain, op_version in purified_operator_set.items(): if op_version is None: continue if i == 0 and len(onnx_model.opset_import) == 1: # Overwrite the default operator set created by # make_model(...) op_set = onnx_model.opset_import[0] else: # Just create one ONNX element in opset_import op_set = onnx_model.opset_import.add() if verbose > 0: print( "[_update_domain_version] +opset %d: name=%r, version=%s" % (i, op_domain, op_version) ) op_set.domain = op_domain if op_set != "": max_supported = get_default_opset_for_domain(op_domain) if max_supported is not None and max_supported < op_version: raise RuntimeError( "The model is using version %d of domain %r not supported " "yet by this library. You need to specify " "target_opset={%r: %r}." % (op_version, op_domain, op_domain, max_supported) ) op_set.version = op_version i += 1 if container.target_opset_any_domain(op_domain) < op_version: raise RuntimeError( "The specified opset %d is too low to convert " "this model, which requires at least opset " "%d." % (container.target_opset_any_domain(op_domain), op_version) ) return "" in purified_operator_set def _get_main_opset_version(model): """ Returns the main opset version. """ mld = None for op in model.opset_import: if op.domain == "": return op.version if op.domain == "ai.onnx.ml": mld = op.version if mld is not None: return OPSET_ML_TO_OPSET.get(mld, None) return None