'''Read Value Change Dump (VCD) files.
The primary interface is the :func:`tokenize()` generator function,
parses a binary VCD stream, yielding tokens as they are encountered.
.. code::
>>> import io
>>> from vcd.reader import TokenKind, tokenize
>>> vcd = b"$date today $end $timescale 1 ns $end"
>>> tokens = tokenize(io.BytesIO(vcd))
>>> token = next(tokens)
>>> assert token.kind is TokenKind.DATE
>>> assert token.date == 'today'
>>> token = next(tokens)
>>> assert token.kind is TokenKind.TIMESCALE
>>> assert token.timescale.magnitude.value == 1
>>> assert token.timescale.unit.value == 'ns'
'''
import io
from dataclasses import dataclass
from enum import Enum
from typing import Iterator, List, NamedTuple, Optional, Tuple, Union
from vcd.common import ScopeType, Timescale, TimescaleMagnitude, TimescaleUnit, VarType
[docs]class TokenKind(Enum):
"""Kinds of VCD tokens."""
COMMENT = 1
DATE = 2
ENDDEFINITIONS = 3
SCOPE = 4
TIMESCALE = 5
UPSCOPE = 6
VAR = 7
VERSION = 8
DUMPALL = 9
DUMPOFF = 10
DUMPON = 11
DUMPVARS = 12
END = 13
CHANGE_TIME = 14
CHANGE_SCALAR = 15
CHANGE_VECTOR = 16
CHANGE_REAL = 17
CHANGE_STRING = 18
[docs]class VarDecl(NamedTuple):
"""VCD variable declaration.
Examples::
$var wire 4 !@# foobar [ 3 : 1 ] $end
$var real 1 aaa foobar $end
$var integer 32 > foobar[8] $end
"""
type_: VarType #: Type of variable
size: int #: Size, in bits, of variable
id_code: str
"""Identifer code of variable.
This code is used in subsequent value change descriptors
to map-back to this variable declaration."""
reference: str
"""Reference name of variable.
This human-readable name typically corresponds to the name of a
variable in the model that output the VCD."""
bit_index: Union[None, int, Tuple[int, int]]
"""Optional range of bits to select from the variable.
May select a single bit index, e.g. ``ref [ 3 ]``. Or a range of
bits, e.g. from ``ref [ 7 : 3 ]`` (MSB index then LSB index)."""
@property
def ref_str(self) -> str:
if self.bit_index is None:
return self.reference
elif isinstance(self.bit_index, int):
return f'{self.reference}[{self.bit_index}]'
else:
return f'{self.reference}[{self.bit_index[0]}:{self.bit_index[1]}]'
[docs]class ScopeDecl(NamedTuple):
"""VCD scope declaration.
Examples::
$scope module Foo $end
$scope
fork alpha_beta
$end
"""
type_: ScopeType #: Type of scope
ident: str #: Scope name
[docs]class VectorChange(NamedTuple):
"""Vector value change descriptor.
A vector value consists of multiple 4-state values, where the four
states are 0, 1, X, and Z. When a vector value consists entirely
of 0 and 1 states, :attr:`value` will be an int. Otherwise
:attr:`value` will be a str.
"""
id_code: str #: Identifier code of associated variable.
value: Union[int, str] #: New value of associated vector variable.
[docs]class RealChange(NamedTuple):
"""Real value (floating point) change descriptor."""
id_code: str #: Identifier code of associated variable.
value: float #: New value of associated real variable.
[docs]class ScalarChange(NamedTuple):
"""Scalar value change descriptor.
A scalar is a single 4-state value. The value is one of '0', '1',
'X', or 'Z'.
"""
id_code: str #: Identifier code of associated variable.
value: str #: New value of associated scalar variable.
[docs]class StringChange(NamedTuple):
"""String value change descriptor.
Strings are VCD extension supported by GTKWave.
"""
id_code: str #: Identifier code of associated variable.
value: str #: New value of associated string variable.
[docs]class Location(NamedTuple):
"""Describe location within VCD stream/file."""
line: int #: Line number
column: int #: Column number
[docs]class Span(NamedTuple):
"""Describe location span within VCD stream/file."""
start: Location #: Start of span
end: Location #: End of span
[docs]class Token(NamedTuple):
"""VCD token yielded from :func:`tokenize()`.
These are relatively high-level tokens insofar as each token fully
captures an entire VCD declaration, command, or change descriptor.
The :attr:`kind` attribute determines the :attr:`data` type. Various
kind-specific properties provide runtime type-checked access to the
kind-specific data.
.. Note::
The :attr:`data` attribute may be accessed directly to avoid
runtime type checks and thus achieve better runtime performance
versus accessing kind-specific properties such as
:attr:`scalar_change`.
"""
kind: TokenKind
"The kind of token."
span: Span
"The start and end location of the token within the file/stream."
data: Union[
None, # $enddefinitions $upscope $dump* $end
int, # time change
str, # $comment, $date, $version
ScopeDecl, # $scope
Timescale, # $timescale
VarDecl, # $var
ScalarChange,
VectorChange,
RealChange,
StringChange,
]
"Data associated with the token. The data type depends on :attr:`kind`."
@property
def comment(self) -> str:
"""Unstructured text from a ``$comment`` declaration."""
assert self.kind is TokenKind.COMMENT
assert isinstance(self.data, str)
return self.data
@property
def date(self) -> str:
"""Unstructured text from a ``$date`` declaration."""
assert self.kind is TokenKind.DATE
assert isinstance(self.data, str)
return self.data
@property
def scope(self) -> ScopeDecl:
"""Scope type and identifier from ``$scope`` declaration."""
assert self.kind is TokenKind.SCOPE
assert isinstance(self.data, ScopeDecl)
return self.data
@property
def timescale(self) -> Timescale:
"""Magnitude and unit from ``$timescale`` declaration."""
assert self.kind is TokenKind.TIMESCALE
assert isinstance(self.data, Timescale)
return self.data
@property
def var(self) -> VarDecl:
"""Details from a ``$var`` declaration."""
assert self.kind is TokenKind.VAR
assert isinstance(self.data, VarDecl)
return self.data
@property
def version(self) -> str:
"""Unstructured text from a ``$version`` declaration."""
assert self.kind is TokenKind.VERSION
assert isinstance(self.data, str)
return self.data
@property
def time_change(self) -> int:
"""Simulation time change."""
assert self.kind is TokenKind.CHANGE_TIME
assert isinstance(self.data, int)
return self.data
@property
def scalar_change(self) -> ScalarChange:
"""Scalar value change descriptor."""
assert self.kind is TokenKind.CHANGE_SCALAR
assert isinstance(self.data, ScalarChange)
return self.data
@property
def vector_change(self) -> VectorChange:
"""Vector value change descriptor."""
assert self.kind is TokenKind.CHANGE_VECTOR
assert isinstance(self.data, VectorChange)
return self.data
@property
def real_change(self) -> RealChange:
"""Real (float) value change descriptor."""
assert self.kind is TokenKind.CHANGE_REAL
assert isinstance(self.data, RealChange)
return self.data
@property
def string_change(self) -> StringChange:
"String value change descriptor."
assert self.kind is TokenKind.CHANGE_STRING
assert isinstance(self.data, StringChange)
return self.data
[docs]class VCDParseError(Exception):
"""Catch-all error for any VCD parsing errors."""
def __init__(self, loc: Location, msg: str) -> None:
super().__init__(f'{loc.line}:{loc.column}: {msg}')
self.loc = loc
"Location within VCD file where error was detected."
HasReadinto = Union[io.BufferedIOBase, io.RawIOBase]
[docs]def tokenize(stream: HasReadinto, buf_size: Optional[int] = None) -> Iterator[Token]:
"""Parse VCD stream into tokens.
The input stream must be opened in binary mode. E.g. with ``open(path, 'rb')``.
"""
if buf_size is None:
buf_size = io.DEFAULT_BUFFER_SIZE
s = _TokenizerState(stream, bytearray(buf_size))
try:
while True:
s.advance()
yield _parse_token(s)
except StopIteration:
return
@dataclass
class _TokenizerState:
stream: HasReadinto
buf: bytearray
pos: int = 0
end: int = 0
lineno: int = 1
column: int = 0
@property
def loc(self) -> Location:
return Location(self.lineno, self.column)
def span(self, start: Location) -> Span:
return Span(start, self.loc)
def advance(self, raise_on_eof: bool = True) -> int:
if self.pos < self.end:
self.pos += 1
else:
n = self.stream.readinto(self.buf)
if n:
self.end = n - 1
self.pos = 0
elif raise_on_eof:
raise StopIteration()
else:
return 0
c = self.buf[self.pos]
if c == 10:
self.lineno += 1
self.column = 1
else:
self.column += 1
return self.buf[self.pos]
def skip_ws(self) -> int:
c = self.buf[self.pos]
while c == 32 or 9 <= c <= 13:
c = self.advance()
return c
def take_ws_after_kw(self, kw: str) -> None:
if _is_ws(self.buf[self.pos]):
self.advance()
else:
raise VCDParseError(self.loc, f'Expected whitespace after identifier ${kw}')
def take_decimal(self) -> int:
digits = []
c = self.buf[self.pos]
while 48 <= c <= 57: # '0' <= c <= '9'
digits.append(c)
c = self.advance(raise_on_eof=False)
if digits:
return int(bytes(digits))
else:
raise VCDParseError(self.loc, 'Expected decimal value')
def take_id_code(self) -> str:
printables = []
c = self.buf[self.pos]
while 33 <= c <= 126: # printable character
printables.append(c)
c = self.advance(raise_on_eof=False)
if printables:
return bytes(printables).decode('ascii')
else:
raise VCDParseError(self.loc, 'Expected id code')
def take_identifier(self) -> str:
c = self.buf[self.pos]
# Simple identifiers must start with letter or underscore
if (
65 <= c <= 90 # 'A' <= c <= 'Z'
or 97 <= c <= 122 # 'a' - 'z'
or c == 95 # '_'
):
identifier = self.take_simple_identifier()
elif c == 92: # '\'
identifier = self.take_escaped_identifier()
else:
raise VCDParseError(self.loc, 'Simple identifier must start with a-zA-Z_')
return bytes(identifier).decode('ascii')
def take_simple_identifier(self) -> List[int]:
identifier = [self.buf[self.pos]]
c = self.advance()
while (
48 <= c <= 57 # '0' - '9'
or 65 <= c <= 90 # 'A' - 'Z'
or 97 <= c <= 122 # 'a' - 'z'
or c == 95 # '_'
or c == 36 # '$'
or c == 46 # '.' not in spec, but seen in the wild
or c == 40 # '(' - produced by cva6 core
or c == 41 # ')' - produced by cva6 core
):
identifier.append(c)
c = self.advance(raise_on_eof=False)
return identifier
def take_escaped_identifier(self) -> List[int]:
identifier = []
c = self.advance()
while c not in (9, 10, 32): # '\t', '\n', ' '
if c < 33 or c > 126: # printable ASCII characters
raise VCDParseError(
self.loc,
'Escaped identifier can only contain printable ASCII characters',
)
identifier.append(c)
c = self.advance()
return identifier
def take_bit_index(self) -> Union[int, Tuple[int, int]]:
self.skip_ws()
index0 = self.take_decimal()
index1: Optional[int]
c = self.skip_ws()
if c == 58: # ':'
self.advance()
self.skip_ws()
index1 = self.take_decimal()
else:
index1 = None
c = self.skip_ws()
if c == 93: # ']'
self.advance(raise_on_eof=False)
if index1 is None:
return index0
else:
return (index0, index1)
else:
raise VCDParseError(self.loc, 'Expected bit index to terminate with "]"')
def take_to_end(self) -> str:
chars = [
self.buf[self.pos], # $
self.advance(), # --> e
self.advance(), # --> n
self.advance(), # --> d
]
while not ( # Check for 'd' 'n' 'e' '$'
chars[-1] == 100
and chars[-2] == 110
and chars[-3] == 101
and chars[-4] == 36
):
chars.append(self.advance())
if len(chars) > 4 and not _is_ws(chars[-5]):
loc = Location(self.lineno, self.column - min(len(chars), 5))
raise VCDParseError(loc, 'Expected whitespace before $end')
return bytes(chars[:-5]).decode('ascii')
def take_end(self) -> None:
if (
self.skip_ws() != 36 # '$'
or self.advance() != 101 # 'e'
or self.advance() != 110 # 'n'
or self.advance() != 100 # 'd'
):
raise VCDParseError(self.loc, 'Expected $end')
def _is_ws(c: int) -> bool:
return c == 32 or 9 <= c <= 13
def _parse_token(s: _TokenizerState) -> Token:
c = s.skip_ws()
start = s.loc
if c == 35: # '#'
# Parse time change
s.advance()
time = s.take_decimal()
return Token(TokenKind.CHANGE_TIME, s.span(start), time)
elif c == 48 or c == 49 or c == 122 or c == 90 or c == 120 or c == 88:
# c in '01zZxX'
# Parse scalar change
scalar_value = chr(c)
s.advance()
id_code = s.take_id_code()
return Token(
TokenKind.CHANGE_SCALAR, s.span(start), ScalarChange(id_code, scalar_value)
)
elif c == 66 or c == 98: # 'B' or 'b'
# Parse vector change
vector = []
c = s.advance()
while c == 48 or c == 49: # '0' or '1'
vector.append(c)
c = s.advance()
vector_value: Union[int, str]
if c == 122 or c == 90 or c == 120 or c == 88: # c in 'zZxX'
vector.append(c)
c = s.advance()
while (
c == 48 or c == 49 or c == 122 or c == 90 or c == 120 or c == 88
): # c in '01zZxX'
vector.append(c)
c = s.advance()
vector_value = bytes(vector).decode('ascii')
else:
vector_value = int(bytes(vector), 2)
if not _is_ws(c):
raise VCDParseError(s.loc, 'Expected whitespace after vector value')
s.skip_ws()
id_code = s.take_id_code()
return Token(
TokenKind.CHANGE_VECTOR, s.span(start), VectorChange(id_code, vector_value)
)
elif c == 82 or c == 114: # 'R' or 'r'
# Parse real change
real_digits = []
c = s.advance()
while not _is_ws(c):
real_digits.append(c)
c = s.advance()
try:
real = float(bytes(real_digits))
except ValueError:
real_str = bytes(real_digits).decode("ascii")
raise VCDParseError(start, f'Expected real value, got: {real_str}')
s.skip_ws()
id_code = s.take_id_code()
return Token(TokenKind.CHANGE_REAL, s.span(start), RealChange(id_code, real))
elif c == 83 or c == 115: # 'S' or 's'
chars = []
c = s.advance()
while not _is_ws(c):
chars.append(c)
c = s.advance()
s.skip_ws()
id_code = s.take_id_code()
string_value = bytes(chars).decode('ascii')
return Token(
TokenKind.CHANGE_STRING, s.span(start), StringChange(id_code, string_value)
)
elif c == 36: # '$'
s.advance()
kw = s.take_identifier()
if kw == 'comment':
s.take_ws_after_kw(kw)
comment = s.take_to_end()
return Token(TokenKind.COMMENT, s.span(start), comment)
elif kw == 'date':
s.take_ws_after_kw(kw)
date_str = s.take_to_end()
return Token(TokenKind.DATE, s.span(start), date_str)
elif kw == 'enddefinitions':
s.take_ws_after_kw(kw)
s.take_end()
return Token(TokenKind.ENDDEFINITIONS, s.span(start), None)
elif kw == 'scope':
s.take_ws_after_kw(kw)
s.skip_ws()
identifier = s.take_identifier()
try:
scope_type = ScopeType(identifier)
except ValueError:
raise VCDParseError(s.loc, f'Invalid $scope type: {identifier}')
s.skip_ws()
scope_ident = s.take_identifier()
s.take_end()
scope_decl = ScopeDecl(scope_type, scope_ident)
return Token(TokenKind.SCOPE, s.span(start), scope_decl)
elif kw == 'timescale':
s.take_ws_after_kw(kw)
s.skip_ws()
mag_int = s.take_decimal()
try:
magnitude = TimescaleMagnitude(mag_int)
except ValueError:
valid_magnitudes = ', '.join(str(m.value) for m in TimescaleMagnitude)
raise VCDParseError(
s.loc,
f'Invalid $timescale magnitude: {mag_int}. '
f'Must be one of: {valid_magnitudes}.',
)
s.skip_ws()
unit_str = s.take_identifier()
try:
unit = TimescaleUnit(unit_str)
except ValueError:
valid_units = ', '.join(u.value for u in TimescaleUnit)
raise VCDParseError(
s.loc,
f'Invalid $timescale unit: {unit_str}. '
f'Must be one of: {valid_units}.',
)
s.take_end()
timescale = Timescale(magnitude, unit)
return Token(TokenKind.TIMESCALE, s.span(start), timescale)
elif kw == 'upscope':
s.take_ws_after_kw(kw)
s.take_end()
return Token(TokenKind.UPSCOPE, s.span(start), None)
elif kw == 'var':
s.take_ws_after_kw(kw)
s.skip_ws()
type_str = s.take_identifier()
try:
type_ = VarType(type_str)
except ValueError:
valid_types = ', '.join(t.value for t in VarType)
raise VCDParseError(
s.loc,
f'Invalid $var type: {type_str}. Must be one of: {valid_types}',
)
s.skip_ws()
size = s.take_decimal()
s.skip_ws()
id_code = s.take_id_code()
s.skip_ws()
ident = s.take_identifier()
bit_index: Union[None, int, Tuple[int, int]]
c = s.skip_ws()
if c == 91: # '['
s.advance()
bit_index = s.take_bit_index()
else:
bit_index = None
s.take_end()
var_decl = VarDecl(type_, size, id_code, ident, bit_index)
return Token(TokenKind.VAR, s.span(start), var_decl)
elif kw == 'version':
s.take_ws_after_kw(kw)
version = s.take_to_end()
return Token(TokenKind.VERSION, s.span(start), version)
elif kw == 'dumpall':
return Token(TokenKind.DUMPALL, s.span(start), None)
elif kw == 'dumpoff':
return Token(TokenKind.DUMPOFF, s.span(start), None)
elif kw == 'dumpon':
return Token(TokenKind.DUMPON, s.span(start), None)
elif kw == 'dumpvars':
return Token(TokenKind.DUMPVARS, s.span(start), None)
elif kw == 'end':
return Token(TokenKind.END, s.span(start), None)
else:
raise VCDParseError(s.loc, f'invalid keyword ${kw}')
else:
raise VCDParseError(s.loc, f'confused: {chr(c)}')