Skip to content

API

Wokwi Python Client Library

Typed, asyncio-friendly Python SDK for the Wokwi Simulation API.

Provides the WokwiClient class for connecting to, controlling, and monitoring Wokwi simulations from Python.

WokwiClient(token: str, server: Optional[str] = None)

Asynchronous client for the Wokwi Simulation API.

This class provides methods to connect to the Wokwi simulator, upload files, control simulations, and monitor serial output. It is designed to be asyncio-friendly and easy to use in Python scripts and applications.

Initialize the WokwiClient.

Parameters:

Name Type Description Default
token str

API token for authentication (get from https://wokwi.com/dashboard/ci).

required
server Optional[str]

Optional custom server URL. Defaults to the public Wokwi server.

None
Source code in src/wokwi_client/client.py
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, token: str, server: Optional[str] = None):
    """
    Initialize the WokwiClient.

    Args:
        token: API token for authentication (get from https://wokwi.com/dashboard/ci).
        server: Optional custom server URL. Defaults to the public Wokwi server.
    """
    self.version = get_version()
    self._transport = Transport(token, server or DEFAULT_WS_URL)
    self.last_pause_nanos = 0
    self._transport.add_event_listener("sim:pause", self._on_pause)
    self._pause_queue = EventQueue(self._transport, "sim:pause")

connect() -> dict[str, Any] async

Connect to the Wokwi simulator server.

Returns:

Type Description
dict[str, Any]

A dictionary with server information (e.g., version).

Source code in src/wokwi_client/client.py
44
45
46
47
48
49
50
51
async def connect(self) -> dict[str, Any]:
    """
    Connect to the Wokwi simulator server.

    Returns:
        A dictionary with server information (e.g., version).
    """
    return await self._transport.connect()

disconnect() -> None async

Disconnect from the Wokwi simulator server.

Source code in src/wokwi_client/client.py
53
54
55
56
57
async def disconnect(self) -> None:
    """
    Disconnect from the Wokwi simulator server.
    """
    await self._transport.close()

pause_simulation() -> ResponseMessage async

Pause the running simulation.

Returns:

Type Description
ResponseMessage

The response message from the server.

Source code in src/wokwi_client/client.py
128
129
130
131
132
133
134
135
async def pause_simulation(self) -> ResponseMessage:
    """
    Pause the running simulation.

    Returns:
        The response message from the server.
    """
    return await pause(self._transport)

restart_simulation(pause: bool = False) -> ResponseMessage async

Restart the simulation, optionally starting paused.

Parameters:

Name Type Description Default
pause bool

Whether to start the simulation paused (default: False).

False

Returns:

Type Description
ResponseMessage

The response message from the server.

Source code in src/wokwi_client/client.py
163
164
165
166
167
168
169
170
171
172
173
async def restart_simulation(self, pause: bool = False) -> ResponseMessage:
    """
    Restart the simulation, optionally starting paused.

    Args:
        pause: Whether to start the simulation paused (default: False).

    Returns:
        The response message from the server.
    """
    return await restart(self._transport, pause)

resume_simulation(pause_after: Optional[int] = None) -> ResponseMessage async

Resume the simulation, optionally pausing after a given number of nanoseconds.

Parameters:

Name Type Description Default
pause_after Optional[int]

Number of nanoseconds to run before pausing again (optional).

None

Returns:

Type Description
ResponseMessage

The response message from the server.

Source code in src/wokwi_client/client.py
137
138
139
140
141
142
143
144
145
146
147
async def resume_simulation(self, pause_after: Optional[int] = None) -> ResponseMessage:
    """
    Resume the simulation, optionally pausing after a given number of nanoseconds.

    Args:
        pause_after: Number of nanoseconds to run before pausing again (optional).

    Returns:
        The response message from the server.
    """
    return await resume(self._transport, pause_after)

serial_monitor_cat() -> None async

Print serial monitor output to stdout as it is received from the simulation.

Source code in src/wokwi_client/client.py
175
176
177
178
179
180
async def serial_monitor_cat(self) -> None:
    """
    Print serial monitor output to stdout as it is received from the simulation.
    """
    async for line in monitor_lines(self._transport):
        print(line.decode("utf-8"), end="", flush=True)

start_simulation(firmware: str, elf: Optional[str] = None, pause: bool = False, chips: list[str] = []) -> ResponseMessage async

Start a new simulation with the given parameters.

The firmware and ELF files must be uploaded to the simulator first using the upload() or upload_file() methods. The firmware file is required for the simulation to run. The ELF file is optional and can speed up the simulation in some cases.

The optional chips parameter can be used to load custom chips into the simulation. For each custom chip, you need to upload two files: - A JSON file with the chip definition, called <chip_name>.chip.json. - A binary file with the chip firmware, called <chip_name>.chip.bin.

For example, to load the inverter chip, you need to upload the inverter.chip.json and inverter.chip.bin files. Then you can pass ["inverter"] to the chips parameter, and reference it in your diagram.json file by adding a part with the type chip-inverter.

Parameters:

Name Type Description Default
firmware str

The firmware binary filename.

required
elf Optional[str]

The ELF file filename (optional).

None
pause bool

Whether to start the simulation paused (default: False).

False
chips list[str]

List of custom chips to load into the simulation (default: empty list).

[]

Returns:

Type Description
ResponseMessage

The response message from the server.

Source code in src/wokwi_client/client.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
async def start_simulation(
    self,
    firmware: str,
    elf: Optional[str] = None,
    pause: bool = False,
    chips: list[str] = [],
) -> ResponseMessage:
    """
    Start a new simulation with the given parameters.

    The firmware and ELF files must be uploaded to the simulator first using the
    `upload()` or `upload_file()` methods.
    The firmware file is required for the simulation to run.
    The ELF file is optional and can speed up the simulation in some cases.

    The optional `chips` parameter can be used to load custom chips into the simulation.
    For each custom chip, you need to upload two files:
    - A JSON file with the chip definition, called `<chip_name>.chip.json`.
    - A binary file with the chip firmware, called `<chip_name>.chip.bin`.

    For example, to load the `inverter` chip, you need to upload the `inverter.chip.json`
    and `inverter.chip.bin` files. Then you can pass `["inverter"]` to the `chips` parameter,
    and reference it in your diagram.json file by adding a part with the type `chip-inverter`.

    Args:
        firmware: The firmware binary filename.
        elf: The ELF file filename (optional).
        pause: Whether to start the simulation paused (default: False).
        chips: List of custom chips to load into the simulation (default: empty list).

    Returns:
        The response message from the server.
    """
    return await start(
        self._transport,
        firmware=firmware,
        elf=elf,
        pause=pause,
        chips=chips,
    )

upload(name: str, content: bytes) -> ResponseMessage async

Upload a file to the simulator from bytes content.

Parameters:

Name Type Description Default
name str

The name to use for the uploaded file.

required
content bytes

The file content as bytes.

required

Returns:

Type Description
ResponseMessage

The response message from the server.

Source code in src/wokwi_client/client.py
59
60
61
62
63
64
65
66
67
68
69
70
async def upload(self, name: str, content: bytes) -> ResponseMessage:
    """
    Upload a file to the simulator from bytes content.

    Args:
        name: The name to use for the uploaded file.
        content: The file content as bytes.

    Returns:
        The response message from the server.
    """
    return await upload(self._transport, name, content)

upload_file(filename: str, local_path: Optional[Path] = None) -> ResponseMessage async

Upload a local file to the simulator.

Parameters:

Name Type Description Default
filename str

The name to use for the uploaded file.

required
local_path Optional[Path]

Optional path to the local file. If not provided, uses filename as the path.

None

Returns:

Type Description
ResponseMessage

The response message from the server.

Source code in src/wokwi_client/client.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def upload_file(
    self, filename: str, local_path: Optional[Path] = None
) -> ResponseMessage:
    """
    Upload a local file to the simulator.

    Args:
        filename: The name to use for the uploaded file.
        local_path: Optional path to the local file. If not provided, uses filename as the path.

    Returns:
        The response message from the server.
    """
    return await upload_file(self._transport, filename, local_path)

wait_until_simulation_time(seconds: float) -> None async

Pause and resume the simulation until the given simulation time (in seconds) is reached.

Parameters:

Name Type Description Default
seconds float

The simulation time to wait for, in seconds.

required
Source code in src/wokwi_client/client.py
149
150
151
152
153
154
155
156
157
158
159
160
161
async def wait_until_simulation_time(self, seconds: float) -> None:
    """
    Pause and resume the simulation until the given simulation time (in seconds) is reached.

    Args:
        seconds: The simulation time to wait for, in seconds.
    """
    await pause(self._transport)
    remaining_nanos = seconds * 1e9 - self.last_pause_nanos
    if remaining_nanos > 0:
        self._pause_queue.flush()
        await resume(self._transport, int(remaining_nanos))
        await self._pause_queue.get()