Skip to content

API

Asynchronous client for the Wokwi Simulation API.

This class provides methods to connect to the Wokwi simulator, upload files, control simulations, and monitor serial output. It is designed to be asyncio-friendly and easy to use in Python scripts and applications. For a synchronous interface, see WokwiClientSync.

Initialize the WokwiClient.

Parameters:

Name Type Description Default
token str

API token for authentication (get from https://wokwi.com/dashboard/ci).

required
server Optional[str]

Optional custom server URL. Defaults to the public Wokwi server.

None
Source code in src/wokwi_client/client.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(self, token: str, server: Optional[str] = None):
    """
    Initialize the WokwiClient.

    Args:
        token: API token for authentication (get from https://wokwi.com/dashboard/ci).
        server: Optional custom server URL. Defaults to the public Wokwi server.
    """
    self.version = get_version()
    self._transport = Transport(token, server or DEFAULT_WS_URL)
    self.last_pause_nanos = 0
    self._transport.add_event_listener("sim:pause", self._on_pause)
    # Lazily create in an active event loop (important for py3.9 and sync client)
    self._pause_queue: Optional[EventQueue] = None
    self._serial_monitor_tasks: set[asyncio.Task[None]] = set()

connect() -> dict[str, Any] async

Connect to the Wokwi simulator server.

Returns:

Type Description
dict[str, Any]

A dictionary with server information (e.g., version).

Source code in src/wokwi_client/client.py
65
66
67
68
69
70
71
72
async def connect(self) -> dict[str, Any]:
    """
    Connect to the Wokwi simulator server.

    Returns:
        A dictionary with server information (e.g., version).
    """
    return await self._transport.connect()

disconnect() -> None async

Disconnect from the Wokwi simulator server.

This also stops all active serial monitors.

Source code in src/wokwi_client/client.py
74
75
76
77
78
79
80
81
async def disconnect(self) -> None:
    """
    Disconnect from the Wokwi simulator server.

    This also stops all active serial monitors.
    """
    self.stop_serial_monitors()
    await self._transport.close()

download(name: str) -> bytes async

Download a file from the simulator.

Parameters:

Name Type Description Default
name str

The name of the file to download.

required

Returns:

Type Description
bytes

The downloaded file content as bytes.

Source code in src/wokwi_client/client.py
119
120
121
122
123
124
125
126
127
128
129
130
async def download(self, name: str) -> bytes:
    """
    Download a file from the simulator.

    Args:
        name: The name of the file to download.

    Returns:
        The downloaded file content as bytes.
    """
    result = await download(self._transport, name)
    return base64.b64decode(result["result"]["binary"])

download_file(name: str, local_path: Optional[Path] = None) -> None async

Download a file from the simulator and save it to a local path.

Parameters:

Name Type Description Default
name str

The name of the file to download.

required
local_path Optional[Path]

The local path to save the downloaded file. If not provided, uses the name as the path.

None
Source code in src/wokwi_client/client.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
async def download_file(self, name: str, local_path: Optional[Path] = None) -> None:
    """
    Download a file from the simulator and save it to a local path.

    Args:
        name: The name of the file to download.
        local_path: The local path to save the downloaded file. If not provided, uses the name as the path.
    """
    if local_path is None:
        local_path = Path(name)

    result = await self.download(name)
    with open(local_path, "wb") as f:
        f.write(result)

gpio_list() -> list[str] async

Get a list of all GPIO pins available in the simulation.

Returns:

Type Description
list[str]

list[str]: Example: ["esp32:GPIO0", "esp32:GPIO1", ...]

Source code in src/wokwi_client/client.py
324
325
326
327
328
329
330
331
332
333
334
async def gpio_list(self) -> list[str]:
    """Get a list of all GPIO pins available in the simulation.

    Returns:
        list[str]: Example: ["esp32:GPIO0", "esp32:GPIO1", ...]
    """
    resp = await gpio_list(self._transport)
    pins_val: Any = resp.get("result", {}).get("pins")
    if not isinstance(pins_val, list) or not all(isinstance(p, str) for p in pins_val):
        raise ProtocolError("Malformed gpio:list response: expected result.pins: list[str]")
    return cast(list[str], pins_val)

listen_pin(part: str, pin: str, listen: bool = True) -> None async

Start or stop listening for changes on a pin.

When enabled, "pin:change" events will be delivered via the transport's event mechanism.

Parameters:

Name Type Description Default
part str

The part id.

required
pin str

The pin name.

required
listen bool

True to start listening, False to stop.

True
Source code in src/wokwi_client/client.py
311
312
313
314
315
316
317
318
319
320
321
322
async def listen_pin(self, part: str, pin: str, listen: bool = True) -> None:
    """Start or stop listening for changes on a pin.

    When enabled, "pin:change" events will be delivered via the transport's
    event mechanism.

    Args:
        part: The part id.
        pin: The pin name.
        listen: True to start listening, False to stop.
    """
    await pin_listen(self._transport, part=part, pin=pin, listen=listen)

pause_simulation() -> None async

Pause the running simulation.

Source code in src/wokwi_client/client.py
192
193
194
195
196
async def pause_simulation(self) -> None:
    """
    Pause the running simulation.
    """
    await pause(self._transport)

read_framebuffer_png_bytes(id: str) -> bytes async

Return the current framebuffer as PNG bytes.

Source code in src/wokwi_client/client.py
373
374
375
async def read_framebuffer_png_bytes(self, id: str) -> bytes:
    """Return the current framebuffer as PNG bytes."""
    return await read_framebuffer_png_bytes(self._transport, id=id)

read_pin(part: str, pin: str) -> PinReadMessage async

Read the current state of a pin.

Parameters:

Name Type Description Default
part str

The part id (e.g. "uno").

required
pin str

The pin name (e.g. "A2").

required
Source code in src/wokwi_client/client.py
301
302
303
304
305
306
307
308
309
async def read_pin(self, part: str, pin: str) -> PinReadMessage:
    """Read the current state of a pin.

    Args:
        part: The part id (e.g. "uno").
        pin: The pin name (e.g. "A2").
    """
    pin_data = await pin_read(self._transport, part=part, pin=pin)
    return cast(PinReadMessage, pin_data["result"])

read_vcd() -> VCDData async

Read logic analyzer data as VCD (Value Change Dump).

Returns the captured signals from the logic analyzer in VCD format, which can be viewed in tools like PulseView or GTKWave.

Returns:

Type Description
VCDData

VCD data containing the vcd string, channel_count, and sample_count.

Raises:

Type Description
WokwiError

If no logic analyzer is present in the diagram.

Source code in src/wokwi_client/client.py
381
382
383
384
385
386
387
388
389
390
391
392
393
async def read_vcd(self) -> VCDData:
    """Read logic analyzer data as VCD (Value Change Dump).

    Returns the captured signals from the logic analyzer in VCD format,
    which can be viewed in tools like PulseView or GTKWave.

    Returns:
        VCD data containing the vcd string, channel_count, and sample_count.

    Raises:
        WokwiError: If no logic analyzer is present in the diagram.
    """
    return await read_vcd(self._transport)

restart_simulation(pause: bool = False) -> None async

Restart the simulation, optionally starting paused.

Parameters:

Name Type Description Default
pause bool

Whether to start the simulation paused (default: False).

False
Source code in src/wokwi_client/client.py
223
224
225
226
227
228
229
230
async def restart_simulation(self, pause: bool = False) -> None:
    """
    Restart the simulation, optionally starting paused.

    Args:
        pause: Whether to start the simulation paused (default: False).
    """
    await restart(self._transport, pause)

resume_simulation(pause_after: Optional[int] = None) -> None async

Resume the simulation, optionally pausing after a given number of nanoseconds.

Parameters:

Name Type Description Default
pause_after Optional[int]

Number of nanoseconds to run before pausing again (optional).

None
Source code in src/wokwi_client/client.py
198
199
200
201
202
203
204
205
async def resume_simulation(self, pause_after: Optional[int] = None) -> None:
    """
    Resume the simulation, optionally pausing after a given number of nanoseconds.

    Args:
        pause_after: Number of nanoseconds to run before pausing again (optional).
    """
    await resume(self._transport, pause_after)

save_framebuffer_png(id: str, path: Path, overwrite: bool = True) -> Path async

Save the current framebuffer as a PNG file.

Source code in src/wokwi_client/client.py
377
378
379
async def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path:
    """Save the current framebuffer as a PNG file."""
    return await save_framebuffer_png(self._transport, id=id, path=path, overwrite=overwrite)

save_vcd(path: Path, overwrite: bool = True) -> VCDData async

Save logic analyzer VCD data to a file.

Parameters:

Name Type Description Default
path Path

Destination file path.

required
overwrite bool

Overwrite existing file (default True).

True

Returns:

Type Description
VCDData

VCD data containing the vcd string, channel_count, and sample_count.

Raises:

Type Description
WokwiError

If file exists and overwrite=False, or if no logic analyzer is present in the diagram.

Source code in src/wokwi_client/client.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
async def save_vcd(self, path: Path, overwrite: bool = True) -> VCDData:
    """Save logic analyzer VCD data to a file.

    Args:
        path: Destination file path.
        overwrite: Overwrite existing file (default True).

    Returns:
        VCD data containing the vcd string, channel_count, and sample_count.

    Raises:
        WokwiError: If file exists and overwrite=False, or if no logic
            analyzer is present in the diagram.
    """
    return await save_vcd(self._transport, path=path, overwrite=overwrite)

serial_monitor(callback: Callable[[bytes], Any]) -> asyncio.Task[None]

Start monitoring the serial output in the background and invoke callback for each line.

This method does not block: it creates and returns an asyncio.Task that runs until the transport is closed or the task is cancelled. The callback may be synchronous or async.

Example

task = client.serial_monitor(lambda line: print(line.decode(), end="")) ... do other async work ... task.cancel()

Source code in src/wokwi_client/client.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def serial_monitor(self, callback: Callable[[bytes], Any]) -> asyncio.Task[None]:
    """
    Start monitoring the serial output in the background and invoke `callback` for each line.

    This method **does not block**: it creates and returns an asyncio.Task that runs until the
    transport is closed or the task is cancelled. The callback may be synchronous or async.

    Example:
        task = client.serial_monitor(lambda line: print(line.decode(), end=""))
        ... do other async work ...
        task.cancel()
    """

    async def _runner() -> None:
        try:
            async for line in monitor_lines(self._transport):
                try:
                    result = callback(line)
                    if inspect.isawaitable(result):
                        await result
                except Exception:
                    # Swallow callback exceptions to keep the monitor alive.
                    # Users can add their own error handling inside the callback.
                    pass
        finally:
            # Clean up task from the set when it completes
            self._serial_monitor_tasks.discard(task)

    task = asyncio.create_task(_runner(), name="wokwi-serial-monitor")
    self._serial_monitor_tasks.add(task)
    return task

serial_monitor_cat(decode_utf8: bool = True, errors: str = 'replace') -> None async

Print serial monitor output to stdout as it is received from the simulation.

Parameters:

Name Type Description Default
decode_utf8 bool

Whether to decode bytes as UTF-8. If False, prints raw bytes (default: True).

True
errors str

How to handle UTF-8 decoding errors. Options: 'strict', 'ignore', 'replace' (default: 'replace').

'replace'
Source code in src/wokwi_client/client.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None:
    """
    Print serial monitor output to stdout as it is received from the simulation.

    Args:
        decode_utf8: Whether to decode bytes as UTF-8. If False, prints raw bytes (default: True).
        errors: How to handle UTF-8 decoding errors. Options: 'strict', 'ignore', 'replace' (default: 'replace').
    """
    async for line in monitor_lines(self._transport):
        if decode_utf8:
            try:
                output = line.decode("utf-8", errors=errors)
                print(output, end="", flush=True)
            except UnicodeDecodeError:
                # Fallback to raw bytes if decoding fails completely
                print(line, end="", flush=True)
        else:
            print(line, end="", flush=True)

serial_write(data: Union[bytes, str, list[int]]) -> None async

Write data to the simulation serial monitor interface.

Source code in src/wokwi_client/client.py
294
295
296
async def serial_write(self, data: Union[bytes, str, list[int]]) -> None:
    """Write data to the simulation serial monitor interface."""
    await write_serial(self._transport, data)

set_control(part: str, control: str, value: Union[int, bool, float]) -> None async

Set a control value (e.g. simulate button press).

Parameters:

Name Type Description Default
part str

Part id (e.g. "btn1").

required
control str

Control name (e.g. "pressed").

required
value Union[int, bool, float]

Control value to set (float).

required
Source code in src/wokwi_client/client.py
336
337
338
339
340
341
342
343
344
async def set_control(self, part: str, control: str, value: Union[int, bool, float]) -> None:
    """Set a control value (e.g. simulate button press).

    Args:
        part: Part id (e.g. "btn1").
        control: Control name (e.g. "pressed").
        value: Control value to set (float).
    """
    await set_control(self._transport, part=part, control=control, value=value)

start_simulation(firmware: str | list[FlashSection] | None = None, elf: Optional[str] = None, pause: bool = False, chips: list[str] = [], flash_size: Optional[int] = None) -> None async

Start a new simulation with the given parameters.

The firmware and ELF files must be uploaded to the simulator first using the upload() or upload_file() methods. The firmware file is required for the simulation to run. The ELF file is optional and can speed up the simulation in some cases.

For ESP-IDF projects, use upload_idf_firmware() to upload firmware sections from flasher_args.json, then pass the result's firmware and flash_size here.

The optional chips parameter can be used to load custom chips into the simulation. For each custom chip, you need to upload two files: - A JSON file with the chip definition, called <chip_name>.chip.json. - A binary file with the chip firmware, called <chip_name>.chip.bin.

For example, to load the inverter chip, you need to upload the inverter.chip.json and inverter.chip.bin files. Then you can pass ["inverter"] to the chips parameter, and reference it in your diagram.json file by adding a part with the type chip-inverter.

Parameters:

Name Type Description Default
firmware str | list[FlashSection] | None

Either a firmware filename (str) or a list of FlashSection objects (from upload_idf_firmware).

None
elf Optional[str]

The ELF file filename (optional).

None
pause bool

Whether to start the simulation paused (default: False).

False
chips list[str]

List of custom chips to load into the simulation (default: empty list).

[]
flash_size Optional[int]

Flash size in megabytes (optional, typically from IdfFirmwareUploadResult).

None
Source code in src/wokwi_client/client.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
async def start_simulation(
    self,
    firmware: "str | list[FlashSection] | None" = None,
    elf: Optional[str] = None,
    pause: bool = False,
    chips: list[str] = [],
    flash_size: Optional[int] = None,
) -> None:
    """
    Start a new simulation with the given parameters.

    The firmware and ELF files must be uploaded to the simulator first using the
    `upload()` or `upload_file()` methods.
    The firmware file is required for the simulation to run.
    The ELF file is optional and can speed up the simulation in some cases.

    For ESP-IDF projects, use `upload_idf_firmware()` to upload firmware sections
    from flasher_args.json, then pass the result's firmware and flash_size here.

    The optional `chips` parameter can be used to load custom chips into the simulation.
    For each custom chip, you need to upload two files:
    - A JSON file with the chip definition, called `<chip_name>.chip.json`.
    - A binary file with the chip firmware, called `<chip_name>.chip.bin`.

    For example, to load the `inverter` chip, you need to upload the `inverter.chip.json`
    and `inverter.chip.bin` files. Then you can pass `["inverter"]` to the `chips` parameter,
    and reference it in your diagram.json file by adding a part with the type `chip-inverter`.

    Args:
        firmware: Either a firmware filename (str) or a list of FlashSection
            objects (from upload_idf_firmware).
        elf: The ELF file filename (optional).
        pause: Whether to start the simulation paused (default: False).
        chips: List of custom chips to load into the simulation (default: empty list).
        flash_size: Flash size in megabytes (optional, typically from IdfFirmwareUploadResult).
    """
    await start(
        self._transport,
        firmware=firmware,
        flash_size=flash_size,
        elf=elf,
        pause=pause,
        chips=chips,
    )

stop_serial_monitors() -> None

Stop all active serial monitor tasks.

This method cancels all tasks created by the serial_monitor method. After calling this method, all active serial monitors will stop receiving data.

Source code in src/wokwi_client/client.py
264
265
266
267
268
269
270
271
272
273
def stop_serial_monitors(self) -> None:
    """
    Stop all active serial monitor tasks.

    This method cancels all tasks created by the serial_monitor method.
    After calling this method, all active serial monitors will stop receiving data.
    """
    for task in self._serial_monitor_tasks.copy():
        task.cancel()
    self._serial_monitor_tasks.clear()

touch_event(part: str, x: float, y: float, event: str, release_after: Optional[int] = None) -> None async

Send a touch event to a part with a touchscreen.

Parameters:

Name Type Description Default
part str

Part id (e.g. "lcd1").

required
x float

X coordinate (touch controller coordinates).

required
y float

Y coordinate (touch controller coordinates).

required
event str

Touch event type: "press", "release", or "move".

required
release_after Optional[int]

For "press" events, automatically release after this many nanoseconds of simulation time (optional).

None
Source code in src/wokwi_client/client.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
async def touch_event(
    self,
    part: str,
    x: float,
    y: float,
    event: str,
    release_after: Optional[int] = None,
) -> None:
    """Send a touch event to a part with a touchscreen.

    Args:
        part: Part id (e.g. "lcd1").
        x: X coordinate (touch controller coordinates).
        y: Y coordinate (touch controller coordinates).
        event: Touch event type: "press", "release", or "move".
        release_after: For "press" events, automatically release
            after this many nanoseconds of simulation time (optional).
    """
    await touch_event(
        self._transport,
        part=part,
        x=x,
        y=y,
        event=event,
        release_after=release_after,
    )

upload(name: str, content: bytes) -> None async

Upload a file to the simulator from bytes content.

Parameters:

Name Type Description Default
name str

The name to use for the uploaded file.

required
content bytes

The file content as bytes.

required
Source code in src/wokwi_client/client.py
83
84
85
86
87
88
89
90
91
async def upload(self, name: str, content: bytes) -> None:
    """
    Upload a file to the simulator from bytes content.

    Args:
        name: The name to use for the uploaded file.
        content: The file content as bytes.
    """
    await upload(self._transport, name, content)

upload_file(filename: str, local_path: Optional[Path] = None) -> str async

Upload a local file to the simulator.

Parameters:

Name Type Description Default
filename str

The name to use for the uploaded file.

required
local_path Optional[Path]

Optional path to the local file. If not provided, uses filename as the path.

None

Returns: The filename of the uploaded file.

Source code in src/wokwi_client/client.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> str:
    """
    Upload a local file to the simulator.

    Args:
        filename: The name to use for the uploaded file.
        local_path: Optional path to the local file. If not provided, uses filename as the path.
    Returns:
        The filename of the uploaded file.
    """
    return await upload_file(self._transport, filename, local_path)

upload_idf_firmware(flasher_args_path: str | Path) -> IdfFirmwareUploadResult async

Upload ESP-IDF firmware from a flasher_args.json file.

Reads flasher_args.json, uploads each flash section (bootloader, partition table, app) individually, and returns the section metadata needed for start_simulation().

Parameters:

Name Type Description Default
flasher_args_path str | Path

Path to the flasher_args.json file.

required

Returns: An IdfFirmwareUploadResult with firmware sections and optional flash_size.

Source code in src/wokwi_client/client.py
105
106
107
108
109
110
111
112
113
114
115
116
117
async def upload_idf_firmware(self, flasher_args_path: "str | Path") -> IdfFirmwareUploadResult:
    """
    Upload ESP-IDF firmware from a flasher_args.json file.

    Reads flasher_args.json, uploads each flash section (bootloader, partition table, app)
    individually, and returns the section metadata needed for start_simulation().

    Args:
        flasher_args_path: Path to the flasher_args.json file.
    Returns:
        An IdfFirmwareUploadResult with firmware sections and optional flash_size.
    """
    return await upload_idf_firmware(self._transport, flasher_args_path)

wait_until_simulation_time(seconds: float) -> None async

Pause and resume the simulation until the given simulation time (in seconds) is reached.

Parameters:

Name Type Description Default
seconds float

The simulation time to wait for, in seconds.

required
Source code in src/wokwi_client/client.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
async def wait_until_simulation_time(self, seconds: float) -> None:
    """
    Pause and resume the simulation until the given simulation time (in seconds) is reached.

    Args:
        seconds: The simulation time to wait for, in seconds.
    """
    await pause(self._transport)
    remaining_nanos = seconds * 1e9 - self.last_pause_nanos
    if remaining_nanos > 0:
        if self._pause_queue is None:
            self._pause_queue = EventQueue(self._transport, "sim:pause")
        self._pause_queue.flush()
        await resume(self._transport, int(remaining_nanos))
        await self._pause_queue.get()

Synchronous client for the Wokwi Simulation API.

Design

• A private asyncio loop runs on a dedicated background thread. • Public methods mirror the async API by submitting the underlying coroutine calls onto that loop and waiting for results (blocking). • Long-lived streamers (serial monitors) are scheduled on the loop and tracked, so we can cancel & drain them on disconnect().

Source code in src/wokwi_client/client_sync.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(self, token: str, server: str | None = None):
    # Create a new event loop for the background thread
    self._loop = asyncio.new_event_loop()
    # Event to signal that the event loop is running
    self._loop_started_event = threading.Event()
    # Start background thread running the event loop
    self._thread = threading.Thread(
        target=self._run_loop, args=(self._loop,), daemon=True, name="wokwi-sync-loop"
    )
    self._thread.start()
    # **Wait until loop is fully started before proceeding** (prevents race conditions)
    if not self._loop_started_event.wait(timeout=8.0):  # timeout to avoid deadlock
        raise RuntimeError("WokwiClientSync event loop failed to start")
    # Initialize underlying async client on the running loop
    self._async_client = WokwiClient(token, server)
    # Track background monitor tasks (futures) for cancellation on exit
    self._bg_futures: set[Future[Any]] = set()
    # Flag to avoid double-closing
    self._closed = False

__getattr__(name: str) -> Any

Delegate attribute access to the underlying async client.

If the attribute on WokwiClient is a coroutine function, return a sync wrapper that blocks until the coroutine completes.

Source code in src/wokwi_client/client_sync.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def __getattr__(self, name: str) -> Any:
    """
    Delegate attribute access to the underlying async client.

    If the attribute on `WokwiClient` is a coroutine function, return a
    sync wrapper that blocks until the coroutine completes.
    """
    # Explicit methods (like serial_monitor functions above) take precedence over __getattr__
    attr = getattr(self._async_client, name)
    if callable(attr):
        # Get the function object from WokwiClient class (unbound) to check if coroutine
        func = getattr(WokwiClient, name, None)
        if func is not None and inspect.iscoroutinefunction(func):
            # Wrap coroutine method to run in background loop
            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                return self._call(attr(*args, **kwargs))

            sync_wrapper.__name__ = name
            sync_wrapper.__doc__ = getattr(func, "__doc__", "")
            return sync_wrapper
    return attr

connect() -> dict[str, Any]

Connect to the simulator (blocking) and return server info.

Source code in src/wokwi_client/client_sync.py
94
95
96
def connect(self) -> dict[str, Any]:
    """Connect to the simulator (blocking) and return server info."""
    return self._call(self._async_client.connect())

serial_monitor(callback: Callable[[bytes], Any]) -> None

Start monitoring the serial output in the background and invoke callback for each line. Non-blocking. Runs until disconnect().

The callback may be sync or async. Exceptions raised by the callback are swallowed to keep the monitor alive (add your own logging as needed).

Source code in src/wokwi_client/client_sync.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def serial_monitor(self, callback: Callable[[bytes], Any]) -> None:
    """
    Start monitoring the serial output in the background and invoke `callback`
    for each line. Non-blocking. Runs until `disconnect()`.

    The callback may be sync or async. Exceptions raised by the callback are
    swallowed to keep the monitor alive (add your own logging as needed).
    """

    async def _runner() -> None:
        try:
            # **Prepare to receive serial events before enabling monitor**
            # (monitor_lines will subscribe to serial events internally)
            async for line in monitor_lines(self._async_client._transport):
                try:
                    result = callback(line)  # invoke callback with the raw bytes line
                    if inspect.isawaitable(result):
                        await result  # await if callback is async
                except Exception:
                    # Swallow exceptions from callback to keep monitor alive
                    pass
        finally:
            # Remove this task’s future from the set when done
            self._bg_futures.discard(task_future)

    # Schedule the serial monitor runner on the event loop:
    task_future = asyncio.run_coroutine_threadsafe(_runner(), self._loop)
    self._bg_futures.add(task_future)

serial_monitor_cat(decode_utf8: bool = True, errors: str = 'replace') -> None

Print serial monitor output in the background (non-blocking). Runs until disconnect().

Parameters:

Name Type Description Default
decode_utf8 bool

Whether to decode bytes as UTF-8 (default True).

True
errors str

UTF-8 decoding error strategy ('strict'|'ignore'|'replace').

'replace'
Source code in src/wokwi_client/client_sync.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None:
    """
    Print serial monitor output in the background (non-blocking). Runs until `disconnect()`.

    Args:
        decode_utf8: Whether to decode bytes as UTF-8 (default True).
        errors: UTF-8 decoding error strategy ('strict'|'ignore'|'replace').
    """

    async def _runner() -> None:
        try:
            # **Subscribe to serial events before reading output**
            async for line in monitor_lines(self._async_client._transport):
                try:
                    if decode_utf8:
                        # Decode bytes to string (handle errors per parameter)
                        text = line.decode("utf-8", errors=errors)
                        print(text, end="", flush=True)
                    else:
                        # Print raw bytes
                        print(line, end="", flush=True)
                except Exception:
                    # Swallow print errors to keep stream alive
                    pass
        finally:
            self._bg_futures.discard(task_future)

    task_future = asyncio.run_coroutine_threadsafe(_runner(), self._loop)
    self._bg_futures.add(task_future)

stop_serial_monitors() -> None

Stop all active serial monitor background tasks.

Source code in src/wokwi_client/client_sync.py
190
191
192
193
194
def stop_serial_monitors(self) -> None:
    """Stop all active serial monitor background tasks."""
    for fut in list(self._bg_futures):
        fut.cancel()
    self._bg_futures.clear()