Source code for onnx.numpy_helper

# Copyright (c) ONNX Project Contributors
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import sys
from typing import Any, Sequence

import numpy as np
import numpy.typing as npt

import onnx._custom_element_types as custom_np_types
from onnx import MapProto, OptionalProto, SequenceProto, TensorProto, helper, subbyte
from onnx.external_data_helper import load_external_data_for_tensor, uses_external_data


[docs] def combine_pairs_to_complex(fa: Sequence[int]) -> list[complex]: return [complex(fa[i * 2], fa[i * 2 + 1]) for i in range(len(fa) // 2)]
[docs] def bfloat16_to_float32( data: np.int16 | np.int32 | np.ndarray, dims: int | Sequence[int] | None = None, ) -> np.ndarray: """Converts ndarray of bf16 (as uint32) to f32 (as uint32). Args: data: A numpy array, empty dimensions are allowed if dims is None. dims: If specified, the function reshapes the results. Returns: A numpy array of float32 with the same dimension if dims is None, or reshaped to dims if specified """ shift = lambda x: x << 16 # noqa: E731 if dims is None: if len(data.shape) == 0: return shift(np.array([data]).astype(np.int32)).view(np.float32)[0] # type: ignore[no-any-return] return shift(data.astype(np.int32)).view(np.float32) # type: ignore[no-any-return] return shift(data.astype(np.int32)).reshape(dims).view(np.float32) # type: ignore[no-any-return]
def _float8e4m3_to_float32_scalar(ival: int, fn: bool, uz: bool) -> np.float32: if not fn: raise NotImplementedError("fn=False is not implemented.") if ival < 0 or ival > 255: # noqa: PLR2004 raise ValueError(f"{ival} is not a float8.") if uz: exponent_bias = 8 if ival == 0x80: # noqa: PLR2004 return np.nan # type: ignore[return-value] else: exponent_bias = 7 if ival == 255: # noqa: PLR2004 return np.float32(-np.nan) if ival == 127: # noqa: PLR2004 return np.float32(np.nan) ival = np.uint32(ival) # type: ignore[assignment] expo = (ival & 0x78) >> 3 mant = ival & 0x07 sign = ival & 0x80 res = sign << 24 if expo == 0: if mant > 0: expo = 0x7F - exponent_bias if mant & 0x4 == 0: mant &= 0x3 mant <<= 1 expo -= 1 if mant & 0x4 == 0: mant &= 0x3 mant <<= 1 expo -= 1 res |= (mant & 0x3) << 21 res |= expo << 23 else: res |= mant << 20 expo += 0x7F - exponent_bias res |= expo << 23 f = np.uint32(res).view(np.float32) return f _float8e4m3_to_float32 = np.vectorize( _float8e4m3_to_float32_scalar, excluded=["fn", "uz"] )
[docs] def float8e4m3_to_float32( data: np.int16 | np.int32 | np.ndarray, dims: int | Sequence[int] | None = None, fn: bool = True, uz: bool = False, ) -> np.ndarray: """Converts ndarray of float8, e4m3 (as uint32) to f32 (as uint32). See :ref:`onnx-detail-float8` for technical details. Args: data: A numpy array, empty dimensions are allowed if dims is None. dims: If specified, the function reshapes the results. fn: No infinite values. uz: No negative zero. Returns: A numpy array of float32 with the same dimension if dims is None, or reshaped to dims if specified. """ if not fn: raise NotImplementedError( "float32_to_float8e4m3 not implemented with fn=False." ) res = _float8e4m3_to_float32(data, fn=fn, uz=uz) if dims is None: return res # type: ignore[no-any-return] return res.reshape(dims) # type: ignore[no-any-return]
def _float8e5m2_to_float32_scalar(ival: int, fn: bool, uz: bool) -> np.float32: if fn and uz: if ival == 0x80: # noqa: PLR2004 return np.float32(np.nan) exponent_bias = 16 elif not fn and not uz: if ival in {253, 254, 255}: return np.float32(-np.nan) if ival in {125, 126, 127}: return np.float32(np.nan) if ival == 252: # noqa: PLR2004 return np.float32(-np.inf) if ival == 124: # noqa: PLR2004 return np.float32(np.inf) exponent_bias = 15 else: raise NotImplementedError("fn and uz must be both False or True.") ival = np.uint32(ival) # type: ignore[assignment] expo = (ival & 0x7C) >> 2 mant = ival & 0x03 sign = ival & 0x80 res = sign << 24 if expo == 0: if mant > 0: expo = 0x7F - exponent_bias if mant & 0x2 == 0: mant &= 0x1 mant <<= 1 expo -= 1 res |= (mant & 0x1) << 22 res |= expo << 23 else: res |= mant << 21 expo += 0x7F - exponent_bias res |= expo << 23 f = np.uint32(res).view(np.float32) return f _float8e5m2_to_float32 = np.vectorize( _float8e5m2_to_float32_scalar, excluded=["fn", "uz"] )
[docs] def float8e5m2_to_float32( data: np.int16 | np.int32 | np.ndarray, dims: int | Sequence[int] | None = None, fn: bool = False, uz: bool = False, ) -> np.ndarray: """Converts ndarray of float8, e5m2 (as uint32) to f32 (as uint32). See :ref:`onnx-detail-float8` for technical details. Args: data: A numpy array, empty dimensions are allowed if dims is None. dims: If specified, the function reshapes the results. fn: No infinite values. uz: No negative zero. Returns: A numpy array of float32 with the same dimension if dims is None, or reshaped to dims if specified """ res = _float8e5m2_to_float32(data, fn=fn, uz=uz) if dims is None: return res # type: ignore[no-any-return] return res.reshape(dims) # type: ignore[no-any-return]
[docs] def unpack_int4( data: np.int32 | np.ndarray, dims: int | Sequence[int], signed: bool, ) -> np.ndarray: """Converts ndarray of int4 (as packed uint8) to f32 See :ref:`onnx-detail-int4` for technical details. Args: data: A numpy array, empty dimensions are allowed if dims is None. dims: The dimensions are used to reshape the unpacked buffer signed: Whether the 4 bit integer is signed or unsigned Returns: A numpy array of float32 reshaped to dims. """ single_func = lambda x: subbyte.unpack_single_4bitx2(x, signed) # noqa: E731 func = np.frompyfunc(single_func, 1, 2) res_high, res_low = func(data.ravel()) res = np.empty((res_high.size + res_low.size,), dtype=np.float32) res[0::2] = res_high res[1::2] = res_low if ( res.size == np.prod(dims) + 1 ): # handle single-element padding due to odd number of elements res = res.ravel()[:-1] res = res.reshape(dims) return res
def unpacked_float4e2m1_to_float32(x: npt.NDArray[np.uint8]) -> npt.NDArray[np.float32]: """Evaluate the numerical value of an array of unpacked float4e2m1 values (as uint8) See :ref:`onnx-detail-int4` for technical details. Args: x: an array of uint8 elements representing a float4e2m1 (using the 4 LSB) Returns: An array of float32 elements representing the values of the float4e2m1 input. """ # x is stored in 4 LSB of int sign = np.where(np.bitwise_and(x, 0x08), -1, 1) mantissa = (x & 0x01).astype(np.float32) exponent = ((x & 0x06) >> 1).astype(np.float32) val = np.where( exponent == 0, sign * (mantissa / 2.0), sign * (1.0 + mantissa / 2.0) * 2.0 ** (exponent - 1), ) # denormalized, normalized return val def unpack_float4e2m1( data: npt.NDArray[np.uint8], dims: int | Sequence[int], ) -> np.ndarray: """Converts ndarray of float4e2m1 (as packed uint8) to f32 See :ref:`onnx-detail-float4` for technical details. Args: data: A numpy array, empty dimensions are allowed if dims is None. dims: The dimensions are used to reshape the unpacked buffer Returns: A numpy array of float32 reshaped to dims. """ res = subbyte.unpack_4bitx2(data.ravel(), dims) return unpacked_float4e2m1_to_float32(res) def _to_array(tensor: TensorProto, base_dir: str = "") -> np.ndarray: # noqa: PLR0911 """Converts a tensor def object to a numpy array. Args: tensor: a TensorProto object. base_dir: if external tensor exists, base_dir can help to find the path to it Returns: arr: the converted array. """ if tensor.HasField("segment"): raise ValueError("Currently not supporting loading segments.") if tensor.data_type == TensorProto.UNDEFINED: raise TypeError("The element type in the input tensor is not defined.") tensor_dtype = tensor.data_type np_dtype = helper.tensor_dtype_to_np_dtype(tensor_dtype) storage_np_dtype = helper.tensor_dtype_to_np_dtype( helper.tensor_dtype_to_storage_tensor_dtype(tensor_dtype) ) storage_field = helper.tensor_dtype_to_field(tensor_dtype) dims = tensor.dims if tensor.data_type == TensorProto.STRING: utf8_strings = getattr(tensor, storage_field) ss = [s.decode("utf-8") for s in utf8_strings] return np.asarray(ss).astype(np_dtype).reshape(dims) # Load raw data from external tensor if it exists if uses_external_data(tensor): load_external_data_for_tensor(tensor, base_dir) if tensor.HasField("raw_data"): # Raw_bytes support: using frombuffer. raw_data = tensor.raw_data if sys.byteorder == "big": # Convert endian from little to big raw_data = np.frombuffer(raw_data, dtype=np_dtype).byteswap().tobytes() # manually convert bf16 since there's no numpy support if tensor_dtype == TensorProto.BFLOAT16: data = np.frombuffer(raw_data, dtype=np.int16) return bfloat16_to_float32(data, dims) if tensor_dtype == TensorProto.FLOAT8E4M3FN: data = np.frombuffer(raw_data, dtype=np.int8) return float8e4m3_to_float32(data, dims) if tensor_dtype == TensorProto.FLOAT8E4M3FNUZ: data = np.frombuffer(raw_data, dtype=np.int8) return float8e4m3_to_float32(data, dims, uz=True) if tensor_dtype == TensorProto.FLOAT8E5M2: data = np.frombuffer(raw_data, dtype=np.int8) return float8e5m2_to_float32(data, dims) if tensor_dtype == TensorProto.FLOAT8E5M2FNUZ: data = np.frombuffer(raw_data, dtype=np.int8) return float8e5m2_to_float32(data, dims, fn=True, uz=True) if tensor_dtype == TensorProto.UINT4: data = np.frombuffer(raw_data, dtype=np.uint8) return unpack_int4(data, dims, signed=False) if tensor_dtype == TensorProto.INT4: data = np.frombuffer(raw_data, dtype=np.int8) return unpack_int4(data, dims, signed=True) if tensor_dtype == TensorProto.FLOAT4E2M1: data = np.frombuffer(raw_data, dtype=np.uint8) return unpack_float4e2m1(data, dims) # type: ignore[arg-type] return np.frombuffer(raw_data, dtype=np_dtype).reshape(dims) # type: ignore[no-any-return] # float16 is stored as int32 (uint16 type); Need view to get the original value if tensor_dtype == TensorProto.FLOAT16: return ( np.asarray(tensor.int32_data, dtype=np.uint16) .reshape(dims) .view(np.float16) ) # bfloat16 is stored as int32 (uint16 type); no numpy support for bf16 if tensor_dtype == TensorProto.BFLOAT16: data = np.asarray(tensor.int32_data, dtype=np.int32) return bfloat16_to_float32(data, dims) if tensor_dtype == TensorProto.FLOAT8E4M3FN: data = np.asarray(tensor.int32_data, dtype=np.int32) return float8e4m3_to_float32(data, dims) if tensor_dtype == TensorProto.FLOAT8E4M3FNUZ: data = np.asarray(tensor.int32_data, dtype=np.int32) return float8e4m3_to_float32(data, dims, uz=True) if tensor_dtype == TensorProto.FLOAT8E5M2: data = np.asarray(tensor.int32_data, dtype=np.int32) return float8e5m2_to_float32(data, dims) if tensor_dtype == TensorProto.FLOAT8E5M2FNUZ: data = np.asarray(tensor.int32_data, dtype=np.int32) return float8e5m2_to_float32(data, dims, fn=True, uz=True) if tensor_dtype == TensorProto.UINT4: data = np.asarray(tensor.int32_data, dtype=storage_np_dtype) return unpack_int4(data, dims, signed=False) if tensor_dtype == TensorProto.INT4: data = np.asarray(tensor.int32_data, dtype=storage_np_dtype) return unpack_int4(data, dims, signed=True) data = getattr(tensor, storage_field) if tensor_dtype in (TensorProto.COMPLEX64, TensorProto.COMPLEX128): data = combine_pairs_to_complex(data) # type: ignore[assignment,arg-type] return np.asarray(data, dtype=storage_np_dtype).astype(np_dtype).reshape(dims)
[docs] def to_array(tensor: TensorProto, base_dir: str = "") -> np.ndarray: """Converts a tensor def object to a numpy array. Supports types defined in :mod:`onnx._custom_element_types`. Args: tensor: a TensorProto object. base_dir: if external tensor exists, base_dir can help to find the path to it Returns: arr: the converted array. """ elem_type = tensor.data_type if elem_type == TensorProto.BFLOAT16: if uses_external_data(tensor): load_external_data_for_tensor(tensor, base_dir) data = tensor.int32_data shape = tuple(tensor.dims) y = np.empty(shape, dtype=custom_np_types.bfloat16).ravel() for i, d in enumerate(data): y[i] = d return y.reshape(shape) if elem_type in ( TensorProto.FLOAT8E4M3FN, TensorProto.FLOAT8E4M3FNUZ, TensorProto.FLOAT8E5M2, TensorProto.FLOAT8E5M2FNUZ, ): m = { TensorProto.FLOAT8E4M3FN: custom_np_types.float8e4m3fn, TensorProto.FLOAT8E4M3FNUZ: custom_np_types.float8e4m3fnuz, TensorProto.FLOAT8E5M2: custom_np_types.float8e5m2, TensorProto.FLOAT8E5M2FNUZ: custom_np_types.float8e5m2fnuz, } if uses_external_data(tensor): load_external_data_for_tensor(tensor, base_dir) if tensor.HasField("raw_data"): data = tensor.raw_data # type: ignore[assignment] else: data = tensor.int32_data shape = tuple(tensor.dims) y = np.empty(shape, dtype=m[elem_type]).ravel() # type: ignore[index] for i, d in enumerate(data): y[i] = d return y.reshape(shape) if elem_type in (TensorProto.UINT4, TensorProto.INT4): if uses_external_data(tensor): load_external_data_for_tensor(tensor, base_dir) if tensor.HasField("raw_data"): data = tensor.raw_data # type: ignore[assignment] else: data = tensor.int32_data shape = tuple(tensor.dims) m = { TensorProto.INT4: custom_np_types.int4, TensorProto.UINT4: custom_np_types.uint4, } dtype = m[elem_type] # type: ignore[index] signed = elem_type == TensorProto.INT4 # 2 packed int4 elements must be represented as a single uint8 value. # Therefore, y is np.uint8 (not the dtype to which the int4 maps) y = np.empty(len(data), dtype=np.uint8).ravel() # type: ignore[assignment] for i, d in enumerate(data): y[i] = d unpacked_data = unpack_int4(y, dims=shape, signed=signed) return unpacked_data.astype(dtype) if elem_type == TensorProto.FLOAT4E2M1: if uses_external_data(tensor): load_external_data_for_tensor(tensor, base_dir) if tensor.HasField("raw_data"): data = tensor.raw_data # type: ignore[assignment] else: data = tensor.int32_data shape = tuple(tensor.dims) # 2 packed float4e2m1 elements must be represented as a single uint8 value. # Therefore, y is np.uint8. y = np.empty(len(data), dtype=custom_np_types.float4e2m1).ravel() # type: ignore[assignment] for i, d in enumerate(data): y[i] = d unpacked_data = subbyte.unpack_4bitx2(y, dims=shape) # type: ignore[arg-type] return unpacked_data.astype(custom_np_types.float4e2m1) return _to_array(tensor, base_dir=base_dir)
def _from_array(arr: np.ndarray, name: str | None = None) -> TensorProto: """Converts a numpy array to a tensor def. Args: arr: a numpy array. name: (optional) the name of the tensor. Returns: TensorProto: the converted tensor def. """ if not isinstance(arr, (np.ndarray, np.generic)): raise TypeError( f"arr must be of type np.generic or np.ndarray, got {type(arr)}" ) tensor = TensorProto() tensor.dims.extend(arr.shape) if name: tensor.name = name if arr.dtype == object or np.issubdtype(arr.dtype, np.str_): # Special care for strings. tensor.data_type = helper.np_dtype_to_tensor_dtype(arr.dtype) # TODO: Introduce full string support. # We flatten the array in case there are 2-D arrays are specified # We throw the error below if we have a 3-D array or some kind of other # object. If you want more complex shapes then follow the below instructions. # Unlike other types where the shape is automatically inferred from # nested arrays of values, the only reliable way now to feed strings # is to put them into a flat array then specify type astype(object) # (otherwise all strings may have different types depending on their length) # and then specify shape .reshape([x, y, z]) flat_array = arr.flatten() for e in flat_array: if isinstance(e, str): tensor.string_data.append(e.encode("utf-8")) elif isinstance(e, np.ndarray): for s in e: if isinstance(s, str): tensor.string_data.append(s.encode("utf-8")) elif isinstance(s, bytes): tensor.string_data.append(s) elif isinstance(e, bytes): tensor.string_data.append(e) else: raise NotImplementedError( "Unrecognized object in the object array, expect a string, or array of bytes: ", str(type(e)), ) return tensor # For numerical types, directly use numpy raw bytes. try: dtype = helper.np_dtype_to_tensor_dtype(arr.dtype) except KeyError as e: raise RuntimeError(f"Numpy data type not understood yet: {arr.dtype!r}") from e tensor.data_type = dtype tensor.raw_data = arr.tobytes() # note: tobytes() is only after 1.9. if sys.byteorder == "big": # Convert endian from big to little convert_endian(tensor) return tensor
[docs] def from_array(tensor: np.ndarray, name: str | None = None) -> TensorProto: """Converts an array into a TensorProto including supported type defined in :mod:`onnx._custom_element_types`. Args: tensor: a numpy array. name: (optional) the name of the tensor. Returns: TensorProto: the converted tensor def. """ if not isinstance(tensor, np.ndarray): return _from_array(tensor, name) dt = tensor.dtype if dt == custom_np_types.float8e4m3fn and dt.descr[0][0] == "e4m3fn": to = TensorProto.FLOAT8E4M3FN dt_to = np.uint8 # type: ignore[assignment] elif dt == custom_np_types.float8e4m3fnuz and dt.descr[0][0] == "e4m3fnuz": to = TensorProto.FLOAT8E4M3FNUZ dt_to = np.uint8 # type: ignore[assignment] elif dt == custom_np_types.float8e5m2 and dt.descr[0][0] == "e5m2": to = TensorProto.FLOAT8E5M2 dt_to = np.uint8 # type: ignore[assignment] elif dt == custom_np_types.float8e5m2fnuz and dt.descr[0][0] == "e5m2fnuz": to = TensorProto.FLOAT8E5M2FNUZ dt_to = np.uint8 # type: ignore[assignment] elif dt == custom_np_types.bfloat16 and dt.descr[0][0] == "bfloat16": to = TensorProto.BFLOAT16 dt_to = np.uint16 # type: ignore[assignment] elif dt == custom_np_types.int4 and dt.descr[0][0] == "int4": to = TensorProto.INT4 dt_to = np.int8 # type: ignore[assignment] elif dt == custom_np_types.uint4 and dt.descr[0][0] == "uint4": to = TensorProto.UINT4 dt_to = np.uint8 # type: ignore[assignment] elif dt == custom_np_types.float4e2m1 and dt.descr[0][0] == "float4e2m1": to = TensorProto.FLOAT4E2M1 dt_to = np.uint8 else: return _from_array(tensor, name) if to in (TensorProto.UINT4, TensorProto.INT4, TensorProto.FLOAT4E2M1): value = tensor.astype(dt_to).ravel() if value.size % 2 == 1: raise ValueError( f"The conversion of a tensor of INT4 or UINT4 requires an even size " f"(shape={tensor.shape}). Every byte stores two INT4 or two UINT4." ) buffer = (value[::2] & 0x0F) + (value[1::2] << 4) pb = TensorProto() pb.dims.extend(tensor.shape) if name: pb.name = name pb.raw_data = buffer.tobytes() pb.data_type = to return pb t = _from_array(tensor.astype(dt_to), name) t.data_type = to return t
[docs] def to_list(sequence: SequenceProto) -> list[Any]: """Converts a sequence def to a Python list. Args: sequence: a SequenceProto object. Returns: list: the converted list. """ elem_type = sequence.elem_type if elem_type == SequenceProto.TENSOR: return [to_array(v) for v in sequence.tensor_values] # type: ignore[arg-type] if elem_type == SequenceProto.SPARSE_TENSOR: return [to_array(v) for v in sequence.sparse_tensor_values] # type: ignore[arg-type] if elem_type == SequenceProto.SEQUENCE: return [to_list(v) for v in sequence.sequence_values] if elem_type == SequenceProto.MAP: return [to_dict(v) for v in sequence.map_values] raise TypeError("The element type in the input sequence is not supported.")
[docs] def from_list( lst: list[Any], name: str | None = None, dtype: int | None = None ) -> SequenceProto: """Converts a list into a sequence def. Args: lst: a Python list name: (optional) the name of the sequence. dtype: (optional) type of element in the input list, used for specifying sequence values when converting an empty list. Returns: SequenceProto: the converted sequence def. """ sequence = SequenceProto() if name: sequence.name = name if dtype: elem_type = dtype elif len(lst) > 0: first_elem = lst[0] if isinstance(first_elem, dict): elem_type = SequenceProto.MAP elif isinstance(first_elem, list): elem_type = SequenceProto.SEQUENCE else: elem_type = SequenceProto.TENSOR else: # if empty input list and no dtype specified # choose sequence of tensors on default elem_type = SequenceProto.TENSOR sequence.elem_type = elem_type if (len(lst) > 0) and not all(isinstance(elem, type(lst[0])) for elem in lst): raise TypeError( "The element type in the input list is not the same " "for all elements and therefore is not supported as a sequence." ) if elem_type == SequenceProto.TENSOR: for tensor in lst: sequence.tensor_values.extend([from_array(tensor)]) elif elem_type == SequenceProto.SEQUENCE: for seq in lst: sequence.sequence_values.extend([from_list(seq)]) elif elem_type == SequenceProto.MAP: for mapping in lst: sequence.map_values.extend([from_dict(mapping)]) else: raise TypeError( "The element type in the input list is not a tensor, " "sequence, or map and is not supported." ) return sequence
[docs] def to_dict(map_proto: MapProto) -> dict[Any, Any]: """Converts a map def to a Python dictionary. Args: map_proto: a MapProto object. Returns: The converted dictionary. """ key_list: list[Any] = [] if map_proto.key_type == TensorProto.STRING: key_list = list(map_proto.string_keys) else: key_list = list(map_proto.keys) value_list = to_list(map_proto.values) if len(key_list) != len(value_list): raise IndexError( "Length of keys and values for MapProto (map name: ", map_proto.name, ") are not the same.", ) dictionary = dict(zip(key_list, value_list)) return dictionary
[docs] def from_dict(dict_: dict[Any, Any], name: str | None = None) -> MapProto: """Converts a Python dictionary into a map def. Args: dict_: Python dictionary name: (optional) the name of the map. Returns: MapProto: the converted map def. """ map_proto = MapProto() if name: map_proto.name = name keys = list(dict_) raw_key_type = np.result_type(keys[0]) key_type = helper.np_dtype_to_tensor_dtype(raw_key_type) valid_key_int_types = [ TensorProto.INT8, TensorProto.INT16, TensorProto.INT32, TensorProto.INT64, TensorProto.UINT8, TensorProto.UINT16, TensorProto.UINT32, TensorProto.UINT64, ] if not ( all( np.result_type(key) == raw_key_type # type: ignore[arg-type] for key in keys ) ): raise TypeError( "The key type in the input dictionary is not the same " "for all keys and therefore is not valid as a map." ) values = list(dict_.values()) raw_value_type = np.result_type(values[0]) if not all(np.result_type(val) == raw_value_type for val in values): raise TypeError( "The value type in the input dictionary is not the same " "for all values and therefore is not valid as a map." ) value_seq = from_list(values) map_proto.key_type = key_type if key_type == TensorProto.STRING: map_proto.string_keys.extend(keys) elif key_type in valid_key_int_types: map_proto.keys.extend(keys) map_proto.values.CopyFrom(value_seq) return map_proto
[docs] def to_optional(optional: OptionalProto) -> Any | None: """Converts an optional def to a Python optional. Args: optional: an OptionalProto object. Returns: opt: the converted optional. """ elem_type = optional.elem_type if elem_type == OptionalProto.UNDEFINED: return None if elem_type == OptionalProto.TENSOR: return to_array(optional.tensor_value) if elem_type == OptionalProto.SPARSE_TENSOR: return to_array(optional.sparse_tensor_value) # type: ignore[arg-type] if elem_type == OptionalProto.SEQUENCE: return to_list(optional.sequence_value) if elem_type == OptionalProto.MAP: return to_dict(optional.map_value) if elem_type == OptionalProto.OPTIONAL: return to_optional(optional.optional_value) raise TypeError("The element type in the input optional is not supported.")
[docs] def from_optional( opt: Any | None, name: str | None = None, dtype: int | None = None ) -> OptionalProto: """Converts an optional value into a Optional def. Args: opt: a Python optional name: (optional) the name of the optional. dtype: (optional) type of element in the input, used for specifying optional values when converting empty none. dtype must be a valid OptionalProto.DataType value Returns: optional: the converted optional def. """ # TODO: create a map and replace conditional branches optional = OptionalProto() if name: optional.name = name if dtype: # dtype must be a valid OptionalProto.DataType valid_dtypes = list(OptionalProto.DataType.values()) if dtype not in valid_dtypes: raise TypeError(f"{dtype} must be a valid OptionalProto.DataType.") elem_type = dtype elif isinstance(opt, dict): elem_type = OptionalProto.MAP elif isinstance(opt, list): elem_type = OptionalProto.SEQUENCE elif opt is None: elem_type = OptionalProto.UNDEFINED else: elem_type = OptionalProto.TENSOR optional.elem_type = elem_type if opt is not None: if elem_type == OptionalProto.TENSOR: optional.tensor_value.CopyFrom(from_array(opt)) elif elem_type == OptionalProto.SEQUENCE: optional.sequence_value.CopyFrom(from_list(opt)) elif elem_type == OptionalProto.MAP: optional.map_value.CopyFrom(from_dict(opt)) else: raise TypeError( "The element type in the input is not a tensor, " "sequence, or map and is not supported." ) return optional
[docs] def convert_endian(tensor: TensorProto) -> None: """Call to convert endianness of raw data in tensor. Args: tensor: TensorProto to be converted. """ tensor_dtype = tensor.data_type np_dtype = helper.tensor_dtype_to_np_dtype(tensor_dtype) tensor.raw_data = ( np.frombuffer(tensor.raw_data, dtype=np_dtype).byteswap().tobytes() )
[docs] def create_random_int( input_shape: tuple[int], dtype: np.dtype, seed: int = 1 ) -> np.ndarray: """Create random integer array for backend/test/case/node. Args: input_shape: The shape for the returned integer array. dtype: The NumPy data type for the returned integer array. seed: The seed for np.random. Returns: np.ndarray: Random integer array. """ np.random.seed(seed) if dtype in ( np.uint8, np.uint16, np.uint32, np.uint64, np.int8, np.int16, np.int32, np.int64, ): # the range of np.random.randint is int32; set a fixed boundary if overflow end = min(np.iinfo(dtype).max, np.iinfo(np.int32).max) start = max(np.iinfo(dtype).min, np.iinfo(np.int32).min) return np.random.randint(start, end, size=input_shape).astype(dtype) else: raise TypeError(f"{dtype} is not supported by create_random_int.")