Skip to content

API

Asynchronous client for the Wokwi Simulation API.

This class provides methods to connect to the Wokwi simulator, upload files, control simulations, and monitor serial output. It is designed to be asyncio-friendly and easy to use in Python scripts and applications. For a synchronous interface, see WokwiClientSync.

Initialize the WokwiClient.

Parameters:

Name Type Description Default
token str

API token for authentication (get from https://wokwi.com/dashboard/ci).

required
server Optional[str]

Optional custom server URL. Defaults to the public Wokwi server.

None
Source code in src/wokwi_client/client.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, token: str, server: Optional[str] = None):
    """
    Initialize the WokwiClient.

    Args:
        token: API token for authentication (get from https://wokwi.com/dashboard/ci).
        server: Optional custom server URL. Defaults to the public Wokwi server.
    """
    self.version = get_version()
    self._transport = Transport(token, server or DEFAULT_WS_URL)
    self.last_pause_nanos = 0
    self._transport.add_event_listener("sim:pause", self._on_pause)
    # Lazily create in an active event loop (important for py3.9 and sync client)
    self._pause_queue: Optional[EventQueue] = None
    self._serial_monitor_tasks: set[asyncio.Task[None]] = set()

connect() -> dict[str, Any] async

Connect to the Wokwi simulator server.

Returns:

Type Description
dict[str, Any]

A dictionary with server information (e.g., version).

Source code in src/wokwi_client/client.py
56
57
58
59
60
61
62
63
async def connect(self) -> dict[str, Any]:
    """
    Connect to the Wokwi simulator server.

    Returns:
        A dictionary with server information (e.g., version).
    """
    return await self._transport.connect()

disconnect() -> None async

Disconnect from the Wokwi simulator server.

This also stops all active serial monitors.

Source code in src/wokwi_client/client.py
65
66
67
68
69
70
71
72
async def disconnect(self) -> None:
    """
    Disconnect from the Wokwi simulator server.

    This also stops all active serial monitors.
    """
    self.stop_serial_monitors()
    await self._transport.close()

download(name: str) -> bytes async

Download a file from the simulator.

Parameters:

Name Type Description Default
name str

The name of the file to download.

required

Returns:

Type Description
bytes

The downloaded file content as bytes.

Source code in src/wokwi_client/client.py
 99
100
101
102
103
104
105
106
107
108
109
110
async def download(self, name: str) -> bytes:
    """
    Download a file from the simulator.

    Args:
        name: The name of the file to download.

    Returns:
        The downloaded file content as bytes.
    """
    result = await download(self._transport, name)
    return base64.b64decode(result["result"]["binary"])

download_file(name: str, local_path: Optional[Path] = None) -> None async

Download a file from the simulator and save it to a local path.

Parameters:

Name Type Description Default
name str

The name of the file to download.

required
local_path Optional[Path]

The local path to save the downloaded file. If not provided, uses the name as the path.

None
Source code in src/wokwi_client/client.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def download_file(self, name: str, local_path: Optional[Path] = None) -> None:
    """
    Download a file from the simulator and save it to a local path.

    Args:
        name: The name of the file to download.
        local_path: The local path to save the downloaded file. If not provided, uses the name as the path.
    """
    if local_path is None:
        local_path = Path(name)

    result = await self.download(name)
    with open(local_path, "wb") as f:
        f.write(result)

gpio_list() -> list[str] async

Get a list of all GPIO pins available in the simulation.

Returns:

Type Description
list[str]

list[str]: Example: ["esp32:GPIO0", "esp32:GPIO1", ...]

Source code in src/wokwi_client/client.py
297
298
299
300
301
302
303
304
305
306
307
async def gpio_list(self) -> list[str]:
    """Get a list of all GPIO pins available in the simulation.

    Returns:
        list[str]: Example: ["esp32:GPIO0", "esp32:GPIO1", ...]
    """
    resp = await gpio_list(self._transport)
    pins_val: Any = resp.get("result", {}).get("pins")
    if not isinstance(pins_val, list) or not all(isinstance(p, str) for p in pins_val):
        raise ProtocolError("Malformed gpio:list response: expected result.pins: list[str]")
    return cast(list[str], pins_val)

listen_pin(part: str, pin: str, listen: bool = True) -> None async

Start or stop listening for changes on a pin.

When enabled, "pin:change" events will be delivered via the transport's event mechanism.

Parameters:

Name Type Description Default
part str

The part id.

required
pin str

The pin name.

required
listen bool

True to start listening, False to stop.

True
Source code in src/wokwi_client/client.py
284
285
286
287
288
289
290
291
292
293
294
295
async def listen_pin(self, part: str, pin: str, listen: bool = True) -> None:
    """Start or stop listening for changes on a pin.

    When enabled, "pin:change" events will be delivered via the transport's
    event mechanism.

    Args:
        part: The part id.
        pin: The pin name.
        listen: True to start listening, False to stop.
    """
    await pin_listen(self._transport, part=part, pin=pin, listen=listen)

pause_simulation() -> None async

Pause the running simulation.

Source code in src/wokwi_client/client.py
165
166
167
168
169
async def pause_simulation(self) -> None:
    """
    Pause the running simulation.
    """
    await pause(self._transport)

read_framebuffer_png_bytes(id: str) -> bytes async

Return the current framebuffer as PNG bytes.

Source code in src/wokwi_client/client.py
319
320
321
async def read_framebuffer_png_bytes(self, id: str) -> bytes:
    """Return the current framebuffer as PNG bytes."""
    return await read_framebuffer_png_bytes(self._transport, id=id)

read_pin(part: str, pin: str) -> PinReadMessage async

Read the current state of a pin.

Parameters:

Name Type Description Default
part str

The part id (e.g. "uno").

required
pin str

The pin name (e.g. "A2").

required
Source code in src/wokwi_client/client.py
274
275
276
277
278
279
280
281
282
async def read_pin(self, part: str, pin: str) -> PinReadMessage:
    """Read the current state of a pin.

    Args:
        part: The part id (e.g. "uno").
        pin: The pin name (e.g. "A2").
    """
    pin_data = await pin_read(self._transport, part=part, pin=pin)
    return cast(PinReadMessage, pin_data["result"])

restart_simulation(pause: bool = False) -> None async

Restart the simulation, optionally starting paused.

Parameters:

Name Type Description Default
pause bool

Whether to start the simulation paused (default: False).

False
Source code in src/wokwi_client/client.py
196
197
198
199
200
201
202
203
async def restart_simulation(self, pause: bool = False) -> None:
    """
    Restart the simulation, optionally starting paused.

    Args:
        pause: Whether to start the simulation paused (default: False).
    """
    await restart(self._transport, pause)

resume_simulation(pause_after: Optional[int] = None) -> None async

Resume the simulation, optionally pausing after a given number of nanoseconds.

Parameters:

Name Type Description Default
pause_after Optional[int]

Number of nanoseconds to run before pausing again (optional).

None
Source code in src/wokwi_client/client.py
171
172
173
174
175
176
177
178
async def resume_simulation(self, pause_after: Optional[int] = None) -> None:
    """
    Resume the simulation, optionally pausing after a given number of nanoseconds.

    Args:
        pause_after: Number of nanoseconds to run before pausing again (optional).
    """
    await resume(self._transport, pause_after)

save_framebuffer_png(id: str, path: Path, overwrite: bool = True) -> Path async

Save the current framebuffer as a PNG file.

Source code in src/wokwi_client/client.py
323
324
325
async def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path:
    """Save the current framebuffer as a PNG file."""
    return await save_framebuffer_png(self._transport, id=id, path=path, overwrite=overwrite)

serial_monitor(callback: Callable[[bytes], Any]) -> asyncio.Task[None]

Start monitoring the serial output in the background and invoke callback for each line.

This method does not block: it creates and returns an asyncio.Task that runs until the transport is closed or the task is cancelled. The callback may be synchronous or async.

Example

task = client.serial_monitor(lambda line: print(line.decode(), end="")) ... do other async work ... task.cancel()

Source code in src/wokwi_client/client.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def serial_monitor(self, callback: Callable[[bytes], Any]) -> asyncio.Task[None]:
    """
    Start monitoring the serial output in the background and invoke `callback` for each line.

    This method **does not block**: it creates and returns an asyncio.Task that runs until the
    transport is closed or the task is cancelled. The callback may be synchronous or async.

    Example:
        task = client.serial_monitor(lambda line: print(line.decode(), end=""))
        ... do other async work ...
        task.cancel()
    """

    async def _runner() -> None:
        try:
            async for line in monitor_lines(self._transport):
                try:
                    result = callback(line)
                    if inspect.isawaitable(result):
                        await result
                except Exception:
                    # Swallow callback exceptions to keep the monitor alive.
                    # Users can add their own error handling inside the callback.
                    pass
        finally:
            # Clean up task from the set when it completes
            self._serial_monitor_tasks.discard(task)

    task = asyncio.create_task(_runner(), name="wokwi-serial-monitor")
    self._serial_monitor_tasks.add(task)
    return task

serial_monitor_cat(decode_utf8: bool = True, errors: str = 'replace') -> None async

Print serial monitor output to stdout as it is received from the simulation.

Parameters:

Name Type Description Default
decode_utf8 bool

Whether to decode bytes as UTF-8. If False, prints raw bytes (default: True).

True
errors str

How to handle UTF-8 decoding errors. Options: 'strict', 'ignore', 'replace' (default: 'replace').

'replace'
Source code in src/wokwi_client/client.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None:
    """
    Print serial monitor output to stdout as it is received from the simulation.

    Args:
        decode_utf8: Whether to decode bytes as UTF-8. If False, prints raw bytes (default: True).
        errors: How to handle UTF-8 decoding errors. Options: 'strict', 'ignore', 'replace' (default: 'replace').
    """
    async for line in monitor_lines(self._transport):
        if decode_utf8:
            try:
                output = line.decode("utf-8", errors=errors)
                print(output, end="", flush=True)
            except UnicodeDecodeError:
                # Fallback to raw bytes if decoding fails completely
                print(line, end="", flush=True)
        else:
            print(line, end="", flush=True)

serial_write(data: Union[bytes, str, list[int]]) -> None async

Write data to the simulation serial monitor interface.

Source code in src/wokwi_client/client.py
267
268
269
async def serial_write(self, data: Union[bytes, str, list[int]]) -> None:
    """Write data to the simulation serial monitor interface."""
    await write_serial(self._transport, data)

set_control(part: str, control: str, value: Union[int, bool, float]) -> None async

Set a control value (e.g. simulate button press).

Parameters:

Name Type Description Default
part str

Part id (e.g. "btn1").

required
control str

Control name (e.g. "pressed").

required
value Union[int, bool, float]

Control value to set (float).

required
Source code in src/wokwi_client/client.py
309
310
311
312
313
314
315
316
317
async def set_control(self, part: str, control: str, value: Union[int, bool, float]) -> None:
    """Set a control value (e.g. simulate button press).

    Args:
        part: Part id (e.g. "btn1").
        control: Control name (e.g. "pressed").
        value: Control value to set (float).
    """
    await set_control(self._transport, part=part, control=control, value=value)

start_simulation(firmware: str, elf: Optional[str] = None, pause: bool = False, chips: list[str] = []) -> None async

Start a new simulation with the given parameters.

The firmware and ELF files must be uploaded to the simulator first using the upload() or upload_file() methods. The firmware file is required for the simulation to run. The ELF file is optional and can speed up the simulation in some cases.

The optional chips parameter can be used to load custom chips into the simulation. For each custom chip, you need to upload two files: - A JSON file with the chip definition, called <chip_name>.chip.json. - A binary file with the chip firmware, called <chip_name>.chip.bin.

For example, to load the inverter chip, you need to upload the inverter.chip.json and inverter.chip.bin files. Then you can pass ["inverter"] to the chips parameter, and reference it in your diagram.json file by adding a part with the type chip-inverter.

Parameters:

Name Type Description Default
firmware str

The firmware binary filename.

required
elf Optional[str]

The ELF file filename (optional).

None
pause bool

Whether to start the simulation paused (default: False).

False
chips list[str]

List of custom chips to load into the simulation (default: empty list).

[]
Source code in src/wokwi_client/client.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def start_simulation(
    self,
    firmware: str,
    elf: Optional[str] = None,
    pause: bool = False,
    chips: list[str] = [],
) -> None:
    """
    Start a new simulation with the given parameters.

    The firmware and ELF files must be uploaded to the simulator first using the
    `upload()` or `upload_file()` methods.
    The firmware file is required for the simulation to run.
    The ELF file is optional and can speed up the simulation in some cases.

    The optional `chips` parameter can be used to load custom chips into the simulation.
    For each custom chip, you need to upload two files:
    - A JSON file with the chip definition, called `<chip_name>.chip.json`.
    - A binary file with the chip firmware, called `<chip_name>.chip.bin`.

    For example, to load the `inverter` chip, you need to upload the `inverter.chip.json`
    and `inverter.chip.bin` files. Then you can pass `["inverter"]` to the `chips` parameter,
    and reference it in your diagram.json file by adding a part with the type `chip-inverter`.

    Args:
        firmware: The firmware binary filename.
        elf: The ELF file filename (optional).
        pause: Whether to start the simulation paused (default: False).
        chips: List of custom chips to load into the simulation (default: empty list).
    """
    await start(
        self._transport,
        firmware=firmware,
        elf=elf,
        pause=pause,
        chips=chips,
    )

stop_serial_monitors() -> None

Stop all active serial monitor tasks.

This method cancels all tasks created by the serial_monitor method. After calling this method, all active serial monitors will stop receiving data.

Source code in src/wokwi_client/client.py
237
238
239
240
241
242
243
244
245
246
def stop_serial_monitors(self) -> None:
    """
    Stop all active serial monitor tasks.

    This method cancels all tasks created by the serial_monitor method.
    After calling this method, all active serial monitors will stop receiving data.
    """
    for task in self._serial_monitor_tasks.copy():
        task.cancel()
    self._serial_monitor_tasks.clear()

upload(name: str, content: bytes) -> None async

Upload a file to the simulator from bytes content.

Parameters:

Name Type Description Default
name str

The name to use for the uploaded file.

required
content bytes

The file content as bytes.

required
Source code in src/wokwi_client/client.py
74
75
76
77
78
79
80
81
82
async def upload(self, name: str, content: bytes) -> None:
    """
    Upload a file to the simulator from bytes content.

    Args:
        name: The name to use for the uploaded file.
        content: The file content as bytes.
    """
    await upload(self._transport, name, content)

upload_file(filename: str, local_path: Optional[Path] = None) -> str async

Upload a local file to the simulator. If you specify the local_path to the file flasher_args.json (IDF flash information), the contents of the file will be processed and the correct firmware file will be uploaded instead, returning the firmware filename.

Parameters:

Name Type Description Default
filename str

The name to use for the uploaded file.

required
local_path Optional[Path]

Optional path to the local file. If not provided, uses filename as the path.

None

Returns: The filename of the uploaded file (useful for idf when uploading flasher_args.json).

Source code in src/wokwi_client/client.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> str:
    """
    Upload a local file to the simulator.
    If you specify the local_path to the file `flasher_args.json` (IDF flash information),
    the contents of the file will be processed and the correct firmware file will be
    uploaded instead, returning the firmware filename.

    Args:
        filename: The name to use for the uploaded file.
        local_path: Optional path to the local file. If not provided, uses filename as the path.
    Returns:
        The filename of the uploaded file (useful for idf when uploading flasher_args.json).
    """
    return await upload_file(self._transport, filename, local_path)

wait_until_simulation_time(seconds: float) -> None async

Pause and resume the simulation until the given simulation time (in seconds) is reached.

Parameters:

Name Type Description Default
seconds float

The simulation time to wait for, in seconds.

required
Source code in src/wokwi_client/client.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
async def wait_until_simulation_time(self, seconds: float) -> None:
    """
    Pause and resume the simulation until the given simulation time (in seconds) is reached.

    Args:
        seconds: The simulation time to wait for, in seconds.
    """
    await pause(self._transport)
    remaining_nanos = seconds * 1e9 - self.last_pause_nanos
    if remaining_nanos > 0:
        if self._pause_queue is None:
            self._pause_queue = EventQueue(self._transport, "sim:pause")
        self._pause_queue.flush()
        await resume(self._transport, int(remaining_nanos))
        await self._pause_queue.get()

Synchronous client for the Wokwi Simulation API.

Design

• A private asyncio loop runs on a dedicated background thread. • Public methods mirror the async API by submitting the underlying coroutine calls onto that loop and waiting for results (blocking). • Long-lived streamers (serial monitors) are scheduled on the loop and tracked, so we can cancel & drain them on disconnect().

Source code in src/wokwi_client/client_sync.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(self, token: str, server: str | None = None):
    # Create a new event loop for the background thread
    self._loop = asyncio.new_event_loop()
    # Event to signal that the event loop is running
    self._loop_started_event = threading.Event()
    # Start background thread running the event loop
    self._thread = threading.Thread(
        target=self._run_loop, args=(self._loop,), daemon=True, name="wokwi-sync-loop"
    )
    self._thread.start()
    # **Wait until loop is fully started before proceeding** (prevents race conditions)
    if not self._loop_started_event.wait(timeout=8.0):  # timeout to avoid deadlock
        raise RuntimeError("WokwiClientSync event loop failed to start")
    # Initialize underlying async client on the running loop
    self._async_client = WokwiClient(token, server)
    # Track background monitor tasks (futures) for cancellation on exit
    self._bg_futures: set[Future[Any]] = set()
    # Flag to avoid double-closing
    self._closed = False

__getattr__(name: str) -> Any

Delegate attribute access to the underlying async client.

If the attribute on WokwiClient is a coroutine function, return a sync wrapper that blocks until the coroutine completes.

Source code in src/wokwi_client/client_sync.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def __getattr__(self, name: str) -> Any:
    """
    Delegate attribute access to the underlying async client.

    If the attribute on `WokwiClient` is a coroutine function, return a
    sync wrapper that blocks until the coroutine completes.
    """
    # Explicit methods (like serial_monitor functions above) take precedence over __getattr__
    attr = getattr(self._async_client, name)
    if callable(attr):
        # Get the function object from WokwiClient class (unbound) to check if coroutine
        func = getattr(WokwiClient, name, None)
        if func is not None and inspect.iscoroutinefunction(func):
            # Wrap coroutine method to run in background loop
            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                return self._call(attr(*args, **kwargs))

            sync_wrapper.__name__ = name
            sync_wrapper.__doc__ = getattr(func, "__doc__", "")
            return sync_wrapper
    return attr

connect() -> dict[str, Any]

Connect to the simulator (blocking) and return server info.

Source code in src/wokwi_client/client_sync.py
94
95
96
def connect(self) -> dict[str, Any]:
    """Connect to the simulator (blocking) and return server info."""
    return self._call(self._async_client.connect())

serial_monitor(callback: Callable[[bytes], Any]) -> None

Start monitoring the serial output in the background and invoke callback for each line. Non-blocking. Runs until disconnect().

The callback may be sync or async. Exceptions raised by the callback are swallowed to keep the monitor alive (add your own logging as needed).

Source code in src/wokwi_client/client_sync.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def serial_monitor(self, callback: Callable[[bytes], Any]) -> None:
    """
    Start monitoring the serial output in the background and invoke `callback`
    for each line. Non-blocking. Runs until `disconnect()`.

    The callback may be sync or async. Exceptions raised by the callback are
    swallowed to keep the monitor alive (add your own logging as needed).
    """

    async def _runner() -> None:
        try:
            # **Prepare to receive serial events before enabling monitor**
            # (monitor_lines will subscribe to serial events internally)
            async for line in monitor_lines(self._async_client._transport):
                try:
                    result = callback(line)  # invoke callback with the raw bytes line
                    if inspect.isawaitable(result):
                        await result  # await if callback is async
                except Exception:
                    # Swallow exceptions from callback to keep monitor alive
                    pass
        finally:
            # Remove this task’s future from the set when done
            self._bg_futures.discard(task_future)

    # Schedule the serial monitor runner on the event loop:
    task_future = asyncio.run_coroutine_threadsafe(_runner(), self._loop)
    self._bg_futures.add(task_future)

serial_monitor_cat(decode_utf8: bool = True, errors: str = 'replace') -> None

Print serial monitor output in the background (non-blocking). Runs until disconnect().

Parameters:

Name Type Description Default
decode_utf8 bool

Whether to decode bytes as UTF-8 (default True).

True
errors str

UTF-8 decoding error strategy ('strict'|'ignore'|'replace').

'replace'
Source code in src/wokwi_client/client_sync.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None:
    """
    Print serial monitor output in the background (non-blocking). Runs until `disconnect()`.

    Args:
        decode_utf8: Whether to decode bytes as UTF-8 (default True).
        errors: UTF-8 decoding error strategy ('strict'|'ignore'|'replace').
    """

    async def _runner() -> None:
        try:
            # **Subscribe to serial events before reading output**
            async for line in monitor_lines(self._async_client._transport):
                try:
                    if decode_utf8:
                        # Decode bytes to string (handle errors per parameter)
                        text = line.decode("utf-8", errors=errors)
                        print(text, end="", flush=True)
                    else:
                        # Print raw bytes
                        print(line, end="", flush=True)
                except Exception:
                    # Swallow print errors to keep stream alive
                    pass
        finally:
            self._bg_futures.discard(task_future)

    task_future = asyncio.run_coroutine_threadsafe(_runner(), self._loop)
    self._bg_futures.add(task_future)

stop_serial_monitors() -> None

Stop all active serial monitor background tasks.

Source code in src/wokwi_client/client_sync.py
190
191
192
193
194
def stop_serial_monitors(self) -> None:
    """Stop all active serial monitor background tasks."""
    for fut in list(self._bg_futures):
        fut.cancel()
    self._bg_futures.clear()