"""Write Value Change Dump files.
This module provides :class:`VCDWriter` for writing VCD files.
"""
from datetime import datetime
from itertools import zip_longest
from numbers import Number
from types import TracebackType
from typing import (
IO,
Dict,
Generator,
Generic,
List,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
)
from vcd.common import ScopeType, Timescale, TimescaleMagnitude, TimescaleUnit, VarType
[docs]class VCDPhaseError(Exception):
"""Indicating a :class:`VCDWriter` method was called in the wrong phase.
For example, calling :meth:`register_var()` after :meth:`close()` will raise this
exception.
"""
ScopeTuple = Tuple[str, ...]
ScopeInput = Union[str, Sequence[str]]
TimeValue = Union[int, float]
TimescaleLike = Union[Timescale, Tuple[int, str], str]
CompoundSize = Sequence[int]
VariableSize = Union[int, CompoundSize]
EventValue = Union[bool, int]
RealValue = Union[float, int]
ScalarValue = Union[int, bool, str, None]
StringValue = Union[str, None]
CompoundValue = Sequence[ScalarValue]
VarValue = Union[EventValue, RealValue, ScalarValue, StringValue, CompoundValue]
[docs]class VCDWriter:
"""Value Change Dump writer.
A VCD file captures time-ordered changes to the value of variables.
:param file file: A file-like object to write the VCD data.
:param timescale:
Scale of the VCD timestamps. The timescale may either be a string or a tuple
containing an (int, str) pair.
:type timescale: str, tuple
:param str date: Optional `$date` string used in the VCD header.
:param str comment: Optional `$comment` string used in the VCD header.
:param str version: Optional `$version` string used in the VCD header.
:param str default_scope_type: Scope type for scopes where :meth:`set_scope_type()`
is not called explicitly.
:param str scope_sep: Separator for scopes specified as strings.
:param int init_timestamp: The initial timestamp. default=0
:raises ValueError: for invalid timescale values
"""
def __init__(
self,
file: IO[str],
timescale: TimescaleLike = '1 us',
date: Optional[str] = None,
comment: str = '',
version: str = '',
default_scope_type: Union[ScopeType, str] = ScopeType.module,
scope_sep: str = '.',
check_values: bool = True,
init_timestamp: TimeValue = 0,
) -> None:
self._ofile = file
self._header_keywords = {
'$timescale': self._check_timescale(timescale),
'$date': str(datetime.now()) if date is None else date,
'$comment': comment,
'$version': version,
}
self._default_scope_type = ScopeType(default_scope_type)
self._scope_sep = scope_sep
self._check_values = check_values
self._registering = True
self._closed = False
self._dumping = True
self._next_var_id: int = 1
self._scope_var_strs: Dict[ScopeTuple, List[str]] = {}
self._scope_var_names: Dict[ScopeTuple, Set[str]] = {}
self._scope_types: Dict[ScopeTuple, ScopeType] = {}
self._vars: List[Variable] = []
self._timestamp = int(init_timestamp)
self._last_dumped_ts: Optional[int] = None
[docs] def set_scope_type(
self, scope: ScopeInput, scope_type: Union[ScopeType, str]
) -> None:
"""Set the scope_type for a given scope.
The scope's type may be set to one of the valid :class:`ScopeType` values. VCD
viewer applications may display different scope types differently.
:param scope: The scope to set the type of.
:type scope: str or sequence of str
:param str scope_type: A valid scope type string.
:raises ValueError: for invalid `scope_type`
"""
scope_type = ScopeType(scope_type)
scope_tuple = self._get_scope_tuple(scope)
self._scope_types[scope_tuple] = scope_type
[docs] def register_var(
self,
scope: ScopeInput,
name: str,
var_type: Union[VarType, str],
size: Optional[VariableSize] = None,
init: VarValue = None,
) -> 'Variable':
"""Register a new VCD variable.
All VCD variables must be registered prior to any value changes.
:param scope: The hierarchical scope that the variable belongs within.
:type scope: str or sequence of str
:param str name: Name of the variable.
:param VarType var_type: Type of the variable.
:param size:
Size, in bits, of the variable. The *size* may be expressed as an int or,
for vector variable types, a tuple of int. When the size is expressed as a
tuple, the *value* passed to :meth:`change()` must also be a tuple of same
arity as the *size* tuple. Some variable types ('integer', 'real',
'realtime', and 'event') have a default size and thus *size* may be ``None``
for those variable types.
:type size: int or tuple(int) or None
:param init: Optional initial value; defaults to 'x'.
:raises VCDPhaseError: if any values have been changed
:raises ValueError: for invalid var_type value
:raises TypeError: for invalid parameter types
:raises KeyError: for duplicate var name
:returns: :class:`Variable` instance appropriate for use with :meth:`change()`.
"""
if self._closed:
raise VCDPhaseError('Cannot register after close().')
elif not self._registering:
raise VCDPhaseError('Cannot register after time 0.')
var_type = VarType(var_type)
scope_tuple = self._get_scope_tuple(scope)
scope_names = self._scope_var_names.setdefault(scope_tuple, set())
if name in scope_names:
raise KeyError(
f'Duplicate var {name} in scope {self._scope_sep.join(scope_tuple)}'
)
if size is None:
if var_type in [VarType.integer, VarType.real, VarType.realtime]:
size = 64
elif var_type in [VarType.event, VarType.string]:
size = 1
else:
raise ValueError(f'Must supply size for {var_type} var_type')
if isinstance(size, Sequence):
size = tuple(size)
var_size = sum(size)
else:
var_size = size
ident = _encode_identifier(self._next_var_id)
var_str = f'$var {var_type} {var_size} {ident} {name} $end'
var: Variable
if var_type == VarType.string:
if init is None:
init = ''
elif not isinstance(init, str):
raise ValueError('string init value must be str')
var = StringVariable(ident, var_type, size, init)
elif var_type == VarType.event:
if init is None:
init = True
elif not isinstance(init, (bool, int)):
raise ValueError('event init value must be int, bool, or None')
var = EventVariable(ident, var_type, size, init)
elif var_type == VarType.real:
if init is None:
init = 0.0
elif not isinstance(init, (float, int)):
raise ValueError('real init value must be float, int, or None')
var = RealVariable(ident, var_type, size, init)
elif size == 1:
if init is None:
init = 'x'
elif not isinstance(init, (int, bool, str)):
raise ValueError('scalar init value must be int, bool, str, or None')
var = ScalarVariable(ident, var_type, size, init)
elif isinstance(size, tuple):
if init is None:
init = tuple('x' * len(size))
elif not isinstance(init, Sequence):
raise ValueError('compount init value must be a sequence')
elif len(init) != len(size):
raise ValueError('compound init value must be same length as size')
elif not all(isinstance(v, (int, bool, str)) for v in init):
raise ValueError('compound init values must be int, bool, or str')
var = CompoundVectorVariable(ident, var_type, size, init)
else:
if init is None:
init = 'x'
elif not isinstance(init, (int, bool, str)):
raise ValueError('vector init value must be int, bool, str, or None')
var = VectorVariable(ident, var_type, size, init)
var.format_value(init, check=True)
# Only alter state after format_value() succeeds
self._vars.append(var)
self._next_var_id += 1
self._scope_var_strs.setdefault(scope_tuple, []).append(var_str)
scope_names.add(name)
return var
[docs] def register_alias(self, scope: ScopeInput, name: str, var: 'Variable') -> None:
"""Register a variable alias.
The same VCD identifier may be associated with multiple reference names ("$var"
declarations). This method associates an existing :class:`Variable` instance
with a different variable scope and/or name. The alias shares the same
identifier, type, size, and value as the reference variable. Because the
identifier is shared, calling :meth:`change()` with ``var`` changes the value of
of all associated reference names.
:param scope: The hierarchical scope that the variable belongs within.
:type scope: str or sequence of str
:param str name: Name of the variable.
:param Variable var: Existing variable to alias.
"""
if self._closed:
raise VCDPhaseError('Cannot register after close().')
elif not self._registering:
raise VCDPhaseError('Cannot register after time 0.')
scope_tuple = self._get_scope_tuple(scope)
scope_names = self._scope_var_names.setdefault(scope_tuple, set())
if name in scope_names:
raise KeyError(
f'Duplicate var {name} in scope {self._scope_sep.join(scope_tuple)}'
)
var_str = f'$var {var.type} {var.size} {var.ident} {name} $end'
self._scope_var_strs.setdefault(scope_tuple, []).append(var_str)
scope_names.add(name)
[docs] def dump_off(self, timestamp: TimeValue) -> None:
"""Suspend dumping to VCD file."""
if self._registering:
self._finalize_registration()
self._set_timestamp(timestamp)
if not self._dumping:
return
self._dump_timestamp()
self._ofile.write('$dumpoff\n')
for var in self._vars:
val_str = var.dump_off()
if val_str:
self._ofile.write(val_str + '\n')
self._ofile.write('$end\n')
self._dumping = False
[docs] def dump_on(self, timestamp: TimeValue) -> None:
"""Resume dumping to VCD file."""
if self._registering:
self._finalize_registration()
self._set_timestamp(timestamp)
if self._dumping:
return
self._dumping = True
self._dump_timestamp()
self._dump_values('$dumpon')
def _dump_values(self, keyword: str) -> None:
self._ofile.write(keyword + '\n')
for var in self._vars:
val_str = var.dump(self._check_values)
if val_str:
self._ofile.write(val_str + '\n')
self._ofile.write('$end\n')
def _set_timestamp(self, timestamp: TimeValue) -> None:
if timestamp < self._timestamp:
raise VCDPhaseError(f'Out of order timestamp: {timestamp}')
elif timestamp > self._timestamp:
self._timestamp = int(timestamp)
def _dump_timestamp(self) -> None:
if (self._timestamp != self._last_dumped_ts and self._dumping) or (
self._last_dumped_ts is None
):
self._last_dumped_ts = self._timestamp
self._ofile.write(f'#{self._timestamp}\n')
[docs] def change(self, var: 'Variable', timestamp: TimeValue, value: VarValue) -> None:
"""Change variable's value in VCD stream.
This is the fundamental behavior of a :class:`VCDWriter` instance. Each time a
variable's value changes, this method should be called.
The *timestamp* must be in-order relative to timestamps from previous calls to
:meth:`change()`. It is okay to call :meth:`change()` multiple times with the
same *timestamp*, but never with a past *timestamp*.
.. Note::
:meth:`change()` may be called multiple times before the timestamp
progresses past 0. The last value change for each variable will go into the
$dumpvars section.
:param Variable var: :class:`Variable` instance (i.e. from
:meth:`register_var()`).
:param int timestamp: Current simulation time.
:param value:
New value for *var*. For :class:`VectorVariable`, if the variable's *size*
is a tuple, then *value* must be a tuple of the same arity.
:raises ValueError: if the value is not valid for *var*.
:raises VCDPhaseError: if the timestamp is out of order or the
:class:`VCDWriter` instance is closed.
"""
if self._closed:
raise VCDPhaseError('Cannot change value after close()')
# Format value early to catch any errors before writing output.
if value != var.value or var.type == VarType.event:
val_str = var.format_value(value, self._check_values)
else:
val_str = ''
# Unroll for performance: self._set_timestamp(timestamp)
if timestamp < self._timestamp:
raise VCDPhaseError(f'Out of order timestamp: {timestamp}')
elif timestamp > self._timestamp:
if self._registering:
self._finalize_registration()
self._timestamp = int(timestamp)
if not val_str:
return
var.value = value
if self._dumping and not self._registering:
# Unroll for performance: self._dump_timestamp()
if self._timestamp != self._last_dumped_ts:
self._last_dumped_ts = self._timestamp
self._ofile.write(f'#{self._timestamp}\n{val_str}\n')
else:
self._ofile.write(f'{val_str}\n')
def _get_scope_tuple(self, scope: ScopeInput) -> ScopeTuple:
if isinstance(scope, str):
return tuple(scope.split(self._scope_sep))
if isinstance(scope, Sequence):
return tuple(scope)
else:
raise TypeError(f'Invalid scope {scope}')
@classmethod
def _check_timescale(cls, timescale: TimescaleLike) -> str:
if isinstance(timescale, Timescale):
return str(timescale)
elif isinstance(timescale, (list, tuple)):
if len(timescale) != 2:
raise ValueError(f'Invalid timescale {timescale}')
mag, unit = timescale
return str(Timescale(TimescaleMagnitude(mag), TimescaleUnit(unit)))
elif isinstance(timescale, str):
return str(Timescale.from_str(timescale))
else:
raise TypeError(f'Invalid timescale type {type(timescale).__name__}')
def __enter__(self) -> 'VCDWriter':
return self
def __exit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
traceback: Optional[TracebackType],
) -> None:
self.close()
[docs] def close(self, timestamp: Optional[TimeValue] = None) -> None:
"""Close VCD writer.
Any buffered VCD data is flushed to the output file. After :meth:`close()`, no
variable registration or value changes will be accepted.
:param int timestamp: optional final timestamp to insert into VCD stream.
.. Note::
The output file is not automatically closed. It is up to the user to ensure
the output file is closed after the :class:`VCDWriter` instance is closed.
"""
if not self._closed:
self.flush(timestamp)
self._closed = True
[docs] def flush(self, timestamp: Optional[TimeValue] = None) -> None:
"""Flush any buffered VCD data to output file.
If the VCD header has not already been written, calling `flush()` will force the
header to be written thus disallowing any further variable registration.
:param int timestamp: optional timestamp to insert into VCD stream.
"""
if self._closed:
raise VCDPhaseError('Cannot flush() after close()')
if self._registering:
self._finalize_registration()
if timestamp is not None:
self._set_timestamp(timestamp)
self._dump_timestamp()
self._ofile.flush()
def _gen_header(self) -> Generator[str, None, None]:
for kwname, kwvalue in sorted(self._header_keywords.items()):
if not kwvalue:
continue
lines = kwvalue.split('\n')
if len(lines) == 1:
yield f'{kwname} {lines[0]} $end'
else:
yield kwname
for line in lines:
yield '\t' + line
yield '$end'
prev_scope: ScopeTuple = ()
for scope in sorted(self._scope_var_strs):
var_strs = self._scope_var_strs.pop(scope)
for i, (prev, this) in enumerate(zip_longest(prev_scope, scope)):
if prev != this:
for _ in prev_scope[i:]:
yield '$upscope $end'
for j, name in enumerate(scope[i:]):
scope_type = self._scope_types.get(
scope[: i + j + 1], self._default_scope_type
)
yield f'$scope {scope_type.value} {name} $end'
break
else:
assert scope != prev_scope # pragma no cover
for var_str in var_strs:
yield var_str
prev_scope = scope
for _ in prev_scope:
yield '$upscope $end'
yield '$enddefinitions $end'
def _finalize_registration(self) -> None:
assert self._registering
self._ofile.write('\n'.join(self._gen_header()) + '\n')
if self._vars:
self._dump_timestamp()
self._dump_values('$dumpvars')
self._registering = False
# This state is not needed after registration phase.
self._header_keywords.clear()
self._scope_types.clear()
self._scope_var_names.clear()
ValueType = TypeVar('ValueType')
[docs]class Variable(Generic[ValueType]):
"""VCD variable details needed to call :meth:`VCDWriter.change()`."""
__slots__ = ('ident', 'type', 'size', 'value')
def __init__(self, ident: str, type: VarType, size: VariableSize, init: ValueType):
#: Identifier used in VCD output stream.
self.ident = ident
#: VCD variable type; one of :const:`VCDWriter.VAR_TYPES`.
self.type = type
#: Size, in bits, of variable.
self.size = size
#: Last value of variable.
self.value = init
def dump(self, check: bool = True) -> Optional[str]:
return self.format_value(self.value, check)
def dump_off(self) -> Optional[str]:
return None
[docs]class ScalarVariable(Variable[ScalarValue]):
"""One-bit VCD scalar.
This is a 4-state variable and thus may have values of 0, 1, 'z', or 'x'.
"""
__slots__ = ()
def dump_off(self) -> str:
return 'x' + self.ident
class EventVariable(Variable[EventValue]):
"""VCD event variable.
An event is transient--it only exists at the time it is changed.
"""
def format_value(self, value: EventValue, check: bool = True) -> str:
if value:
return '1' + self.ident
else:
raise ValueError('Invalid event value')
def dump(self, check: bool = True) -> Optional[str]:
return None
class StringVariable(Variable[StringValue]):
"""String variable as known by GTKWave.
Any "string" (character-chain) can be displayed as a change. This type is only
supported by GTKWave.
"""
__slots__ = ()
def format_value(self, value: StringValue, check: bool = True) -> str:
"""Format scalar value change for VCD stream.
:param value: a string, str()
:type value: str
:raises ValueError: for invalid *value*.
:returns: string representing value change for use in a VCD stream.
"""
if value is None:
value_str = ''
elif check and not isinstance(value, str):
raise ValueError(f'Invalid string value ({value!r})')
else:
value_str = value.translate(
{
9: "\\t",
10: "\\n",
13: "\\r",
32: "\\x20",
92: "\\\\",
}
)
return f's{value_str} {self.ident}'
[docs]class RealVariable(Variable[RealValue]):
"""Real (IEEE-754 double-precision floating point) variable.
Values must be numeric and cannot be 'x' or 'z' states.
"""
__slots__ = ()
[docs]class VectorVariable(Variable[ScalarValue]):
"""Bit vector variable type.
This is for the various non-scalar and non-real variable types including integer,
register, wire, etc.
"""
__slots__ = ()
size: int
def dump_off(self) -> str:
return self.format_value('x', check=False)
[docs]class CompoundVectorVariable(Variable[CompoundValue]):
"""Bit vector variable type with a compound size.
This is for the various non-scalar and non-real variable types including integer,
register, wire, etc.
"""
__slots__ = ()
size: CompoundSize
def dump_off(self) -> str:
return self.format_value(tuple('x' * len(self.size)), check=False)
def _format_scalar_value(value: ScalarValue, size: int, check: bool) -> str:
if isinstance(value, int):
max_val = 1 << size
if check and (-value > (max_val >> 1) or value >= max_val):
raise ValueError(f'Value ({value}) not representable in {size} bits')
if value < 0:
value += max_val
return format(value, 'b')
elif value is None:
return 'z'
else:
if check and (
not isinstance(value, str)
or len(value) > size
or any(c not in '01xzXZ-' for c in value)
):
raise ValueError(f'Invalid vector value ({value})')
return value
def _encode_identifier(v: int) -> str:
"""Encode identifer value into base-94 string."""
assert v > 0, 'identifier codes must be > 0'
encoded = ''
while v != 0:
v -= 1
encoded += chr((v % 94) + 33)
v //= 94
return encoded