Skip to content

cillow.Server

Cillow server component.

This class is responsible for managing request workers and client manager.

  • max_interpreters limits the total number of processes that can be created.
  • interpreters_per_client limits the number of processes that can be created per client.
  • num_worker_threads limits the number of request worker threads.
  • max_queue_size limits the maximum size of the request queue.

Examples:

>>> import cillow
>>>
>>> if __name__ == "__main__":
...     server = cillow.Server(port=5556, max_interpreters=2, interpreters_per_client=1)
...     server.run()

Don't trust LLMS? Concerned about arbitrary code execution? Take full control by limiting functionalities using patches.

To add patches, use the add_patches() function. To clear patches, use clear_patches().

Examples:

>>> import cillow
>>>
>>> import os
>>> from contextlib import contextmanager
>>>
>>> os_system_switchable = cillow.Switchable(os.system)
>>>
>>> @contextmanager
... def patch_os_system():
...     def disabled_os_system(command: str):
...         return "os.system has been disabled."
...
...     with os_system_switchable.switch_to(disabled_os_system):
...         yield
...
>>> cillow.add_patches(
...     patch_os_system,  # Disable os.system
...     cillow.prebuilt_patches.patch_stdout_stderr_write,  # To capture stdout and stderr
...     cillow.prebuilt_patches.patch_matplotlib_pyplot_show,  # To capture matplotlib figures
...     cillow.prebuilt_patches.patch_pillow_show,  # To capture PIL images
... )
>>>
>>> if __name__ == "__main__":
...     server = cillow.Server(port=5556, max_interpreters=2, interpreters_per_client=1)
...     server.run()
Source code in cillow/server/__init__.py
class Server(Logger):
    """
    Cillow server component.

    This class is responsible for managing request workers and client manager.

    - `max_interpreters` limits the total number of processes that can be created.
    - `interpreters_per_client` limits the number of processes that can be created per client.
    - `num_worker_threads` limits the number of request worker threads.
    - `max_queue_size` limits the maximum size of the request queue.

    Examples:
        >>> import cillow
        >>>
        >>> if __name__ == "__main__":
        ...     server = cillow.Server(port=5556, max_interpreters=2, interpreters_per_client=1)
        ...     server.run()

    Don't trust LLMS? Concerned about arbitrary code execution?
    Take full control by limiting functionalities using patches.

    To add patches, use the `add_patches()` function. To clear patches, use `clear_patches()`.

    Examples:
        >>> import cillow
        >>>
        >>> import os
        >>> from contextlib import contextmanager
        >>>
        >>> os_system_switchable = cillow.Switchable(os.system)
        >>>
        >>> @contextmanager
        ... def patch_os_system():
        ...     def disabled_os_system(command: str):
        ...         return "os.system has been disabled."
        ...
        ...     with os_system_switchable.switch_to(disabled_os_system):
        ...         yield
        ...
        >>> cillow.add_patches(
        ...     patch_os_system,  # Disable os.system
        ...     cillow.prebuilt_patches.patch_stdout_stderr_write,  # To capture stdout and stderr
        ...     cillow.prebuilt_patches.patch_matplotlib_pyplot_show,  # To capture matplotlib figures
        ...     cillow.prebuilt_patches.patch_pillow_show,  # To capture PIL images
        ... )
        >>>
        >>> if __name__ == "__main__":
        ...     server = cillow.Server(port=5556, max_interpreters=2, interpreters_per_client=1)
        ...     server.run()
    """

    def __init__(
        self,
        *,
        port: int,
        max_interpreters: int | None = None,
        interpreters_per_client: int | None = None,
        num_worker_threads: int | None = None,
        max_queue_size: int | None = None,
    ):
        """
        Args:
            port: The port to bind the server to
            max_interpreters: Maximum total interpreter processes allowed. (defaults to `os.cpu_count()`)
            interpreters_per_client: Maximum interpreters per client (defaults to `min(2, max_interpreters)`)
            num_worker_threads: Number of worker threads (defaults to `min(2 * max_clients, os.cpu_count())`)
            max_queue_size: Maximum queue size (defaults to `max_clients * interpreters_per_client * 2`)
        """
        self.socket = zmq.Context().socket(zmq.ROUTER)
        self._url = f"tcp://0.0.0.0:{port}"
        self.socket.bind(self._url)

        self._client_manager = ClientManager(max_interpreters, interpreters_per_client)

        if num_worker_threads is None:
            num_worker_threads = self._client_manager.optimal_number_of_request_workers

        if max_queue_size is None:
            max_queue_size = self._client_manager.optimal_max_queue_size

        self.logger.info(f"Max interpreter processes: {self._client_manager.max_interpreters}")
        self.logger.info(f"Interpreter processes per client: {self._client_manager.interpreters_per_client}")
        self.logger.info(f"Number of worker threads: {num_worker_threads}")
        self.logger.info(f"Max request queue size: {max_queue_size}")

        self._request_queue = Queue(maxsize=max_queue_size)  # type: ignore[var-annotated]

        def send_cb(client_id: bytes, msg_type: bytes, msg_body: bytes) -> None:
            self.socket.send_multipart([client_id, b"", msg_type, msg_body])

        self._callback = send_cb
        self._server_event = ThreadEvent()
        self._request_workers = [
            RequestWorker(
                self._request_queue,
                self._client_manager,
                self._callback,  # type: ignore[arg-type]
                self._server_event,
            )
            for _ in range(num_worker_threads)
        ]

    def run(self) -> None:
        """Run the server and block until interrupted."""

        self.logger.info("Starting worker threads...")
        for worker in self._request_workers:
            worker.start()

        signal(Signals.SIGINT, lambda s, f: self._server_event.set())
        signal(Signals.SIGTERM, lambda s, f: self._server_event.set())

        self._server_event.clear()

        try:
            self.logger.info(f"Listening on {self._url}")
            self.logger.info("Press Ctrl+C to exit.")
            while not self._server_event.is_set():
                if not self.socket.poll(timeout=1000):
                    continue

                try:
                    frames = self.socket.recv_multipart(flags=zmq.NOBLOCK)
                    if len(frames) != 3:
                        self._callback(frames[0], b"request_exception", b"Invalid number of frames received")

                    client_id, _, request_bytes = frames
                    try:
                        self._request_queue.put_nowait((client_id, request_bytes))
                    except QueueFullError:
                        self._callback(
                            client_id, b"request_exception", b"Server request queue is full. Try again later."
                        )

                except zmq.ZMQError:
                    pass

        except Exception as e:
            self.logger.error(f"{e.__class__.__name__}: {e!s}")

        finally:
            self.logger.info("Cleaning up resources...")
            self._client_manager.cleanup()

            self.logger.info("Stopping worker threads...")
            for worker in self._request_workers:
                worker.join()

            self.socket.close()
            self.socket.context.term()
            self.logger.info("Shutdown complete.")

run

Run the server and block until interrupted.

Source code in cillow/server/__init__.py
def run(self) -> None:
    """Run the server and block until interrupted."""

    self.logger.info("Starting worker threads...")
    for worker in self._request_workers:
        worker.start()

    signal(Signals.SIGINT, lambda s, f: self._server_event.set())
    signal(Signals.SIGTERM, lambda s, f: self._server_event.set())

    self._server_event.clear()

    try:
        self.logger.info(f"Listening on {self._url}")
        self.logger.info("Press Ctrl+C to exit.")
        while not self._server_event.is_set():
            if not self.socket.poll(timeout=1000):
                continue

            try:
                frames = self.socket.recv_multipart(flags=zmq.NOBLOCK)
                if len(frames) != 3:
                    self._callback(frames[0], b"request_exception", b"Invalid number of frames received")

                client_id, _, request_bytes = frames
                try:
                    self._request_queue.put_nowait((client_id, request_bytes))
                except QueueFullError:
                    self._callback(
                        client_id, b"request_exception", b"Server request queue is full. Try again later."
                    )

            except zmq.ZMQError:
                pass

    except Exception as e:
        self.logger.error(f"{e.__class__.__name__}: {e!s}")

    finally:
        self.logger.info("Cleaning up resources...")
        self._client_manager.cleanup()

        self.logger.info("Stopping worker threads...")
        for worker in self._request_workers:
            worker.join()

        self.socket.close()
        self.socket.context.term()
        self.logger.info("Shutdown complete.")

Client Manager

Manages clients and their interpreter processes.

This class is utilized by the server component to share the instance with all the request worker threads.

Source code in cillow/server/client_manager.py
class ClientManager(Logger):
    """
    Manages clients and their interpreter processes.

    This class is utilized by the server component to share the instance with all the request worker threads.
    """

    def __init__(self, max_interpreters: int | None = None, interpreters_per_client: int | None = None):
        """
        Initialize the client manager.

        Args:
            max_interpreters: Maximum total interpreter processes allowed. Defaults to `os.cpu_count()`
            interpreters_per_client: Maximum processes per client. Defaults to `min(2, max_interpreters)`
        """
        self.cpu_count = multiprocessing.cpu_count()
        self.max_interpreters = min(max_interpreters or self.cpu_count, self.cpu_count)
        self.interpreters_per_client = interpreters_per_client or min(2, self.max_interpreters)
        self.max_clients = self.max_interpreters // self.interpreters_per_client

        self._lock = threading.Lock()
        self._clients: dict[str, ClientInfo] = {}

    @property
    def optimal_number_of_request_workers(self) -> int:
        """Optimal number of request worker threads based on current limits."""
        return min(2 * self.max_clients, self.cpu_count)

    @property
    def optimal_max_queue_size(self) -> int:
        """Get optimal maximum queue size based on current limits."""
        return self.max_clients * self.interpreters_per_client * 2

    @property
    def total_active_processes(self) -> int:
        """Get total number of active interpreter processes."""
        return sum(len(client.interpreters) for client in self._clients.values())

    def register(self, client_id: str, environment: PythonEnvironment | str = "$system") -> None:
        """
        Register a client if possible.

        Args:
            client_id: The client identifier
            environment: The environment to use. This environment will be used as default when an interpreter process is deleted.

        Raises:
            Exception: If the client limit is exceeded.
            LookupError: If the given environment is invalid or not found.
        """
        with self._lock:
            if client_id in self._clients:
                return

            # Check if a new client can be accepted based on current limits.
            if not len(self._clients) < self.max_clients:
                raise Exception("Client limit exceeded. Try again later.")

            environment = validate_environment(environment or "$system")
            interpreter = _InterpreterProcess(environment)
            self._clients[client_id] = ClientInfo(
                default_environment=environment,
                current=CurrentContext(environment=environment, interpreter=interpreter),
                interpreters={environment: interpreter},
            )
            self.logger.info(f"Client {client_id!r} joined the server with {str(environment)!r} environment")

    def get_info(self, client_id: str) -> ClientInfo | None:
        """Get client info"""
        with self._lock:
            return self._clients.get(client_id)

    def switch_interpreter(self, client_id: str, environment: PythonEnvironment | str) -> PythonEnvironment:
        """
        Switch client to interpreter process based on the given environment.

        Args:
            client_id: The client identifier
            environment: The environment to switch to

        Raises:
            Exception: If unable to create new interpreter due to process limit
            LookupError: If the given environment is invalid or not found
            ValueError: If client is not found

        Returns:
            The valid Python environment value
        """
        with self._lock:
            if (client_info := self._clients.get(client_id)) is None:
                raise ValueError(f"Client {client_id!r} not found.")

            environment = validate_environment(environment)
            if client_info.current.environment == environment:
                return environment

            if not (interpreter := client_info.interpreters.get(environment)):
                # Check if client and total process limits are met to create new interpreter
                if (
                    len(client_info.interpreters) < self.interpreters_per_client
                    and self.total_active_processes < self.max_interpreters
                ):
                    interpreter = _InterpreterProcess(environment)
                    client_info.interpreters[environment] = interpreter
                else:
                    raise Exception("Unable to create new interpreter due to process limit.")

            self._clients[client_id].current = CurrentContext(environment=environment, interpreter=interpreter)
            return environment

    def delete_interpreter(self, client_id: str, environment: PythonEnvironment | str) -> None:
        """
        Delete client's interpreter processes at the given environment.

        Args:
            client_id: The client identifier
            environment: The environment associated with the interpreter
        """
        with self._lock:
            if (client_info := self._clients.get(client_id)) is None:
                return

            try:
                environment = validate_environment(environment)
                client_info.interpreters.pop(environment).stop()
            except KeyError:
                return

    def remove(self, client_id: str) -> None:
        """
        Remove a client and stop all its interpreter processes.

        Args:
            client_id: The client identifier
        """
        with self._lock:
            if (client_info := self._clients.get(client_id)) is None:
                return

            for interpreter in client_info.interpreters.values():
                interpreter.stop()
            del self._clients[client_id]
            self.logger.info(f"Client {client_id!r} left the server")

    def cleanup(self) -> None:
        """Stop all the interpreter processes."""
        for info in self._clients.values():
            for interpreter in info.interpreters.values():
                interpreter.stop()
        self._clients.clear()

optimal_max_queue_size property

Get optimal maximum queue size based on current limits.

optimal_number_of_request_workers property

Optimal number of request worker threads based on current limits.

total_active_processes property

Get total number of active interpreter processes.

cleanup

Stop all the interpreter processes.

Source code in cillow/server/client_manager.py
def cleanup(self) -> None:
    """Stop all the interpreter processes."""
    for info in self._clients.values():
        for interpreter in info.interpreters.values():
            interpreter.stop()
    self._clients.clear()

delete_interpreter

Delete client's interpreter processes at the given environment.

Parameters:

Name Type Description Default
client_id str

The client identifier

required
environment PythonEnvironment | str

The environment associated with the interpreter

required
Source code in cillow/server/client_manager.py
def delete_interpreter(self, client_id: str, environment: PythonEnvironment | str) -> None:
    """
    Delete client's interpreter processes at the given environment.

    Args:
        client_id: The client identifier
        environment: The environment associated with the interpreter
    """
    with self._lock:
        if (client_info := self._clients.get(client_id)) is None:
            return

        try:
            environment = validate_environment(environment)
            client_info.interpreters.pop(environment).stop()
        except KeyError:
            return

get_info

Get client info

Source code in cillow/server/client_manager.py
def get_info(self, client_id: str) -> ClientInfo | None:
    """Get client info"""
    with self._lock:
        return self._clients.get(client_id)

register

Register a client if possible.

Parameters:

Name Type Description Default
client_id str

The client identifier

required
environment PythonEnvironment | str

The environment to use. This environment will be used as default when an interpreter process is deleted.

'$system'

Raises:

Type Description
Exception

If the client limit is exceeded.

LookupError

If the given environment is invalid or not found.

Source code in cillow/server/client_manager.py
def register(self, client_id: str, environment: PythonEnvironment | str = "$system") -> None:
    """
    Register a client if possible.

    Args:
        client_id: The client identifier
        environment: The environment to use. This environment will be used as default when an interpreter process is deleted.

    Raises:
        Exception: If the client limit is exceeded.
        LookupError: If the given environment is invalid or not found.
    """
    with self._lock:
        if client_id in self._clients:
            return

        # Check if a new client can be accepted based on current limits.
        if not len(self._clients) < self.max_clients:
            raise Exception("Client limit exceeded. Try again later.")

        environment = validate_environment(environment or "$system")
        interpreter = _InterpreterProcess(environment)
        self._clients[client_id] = ClientInfo(
            default_environment=environment,
            current=CurrentContext(environment=environment, interpreter=interpreter),
            interpreters={environment: interpreter},
        )
        self.logger.info(f"Client {client_id!r} joined the server with {str(environment)!r} environment")

remove

Remove a client and stop all its interpreter processes.

Parameters:

Name Type Description Default
client_id str

The client identifier

required
Source code in cillow/server/client_manager.py
def remove(self, client_id: str) -> None:
    """
    Remove a client and stop all its interpreter processes.

    Args:
        client_id: The client identifier
    """
    with self._lock:
        if (client_info := self._clients.get(client_id)) is None:
            return

        for interpreter in client_info.interpreters.values():
            interpreter.stop()
        del self._clients[client_id]
        self.logger.info(f"Client {client_id!r} left the server")

switch_interpreter

Switch client to interpreter process based on the given environment.

Parameters:

Name Type Description Default
client_id str

The client identifier

required
environment PythonEnvironment | str

The environment to switch to

required

Raises:

Type Description
Exception

If unable to create new interpreter due to process limit

LookupError

If the given environment is invalid or not found

ValueError

If client is not found

Returns:

Type Description
PythonEnvironment

The valid Python environment value

Source code in cillow/server/client_manager.py
def switch_interpreter(self, client_id: str, environment: PythonEnvironment | str) -> PythonEnvironment:
    """
    Switch client to interpreter process based on the given environment.

    Args:
        client_id: The client identifier
        environment: The environment to switch to

    Raises:
        Exception: If unable to create new interpreter due to process limit
        LookupError: If the given environment is invalid or not found
        ValueError: If client is not found

    Returns:
        The valid Python environment value
    """
    with self._lock:
        if (client_info := self._clients.get(client_id)) is None:
            raise ValueError(f"Client {client_id!r} not found.")

        environment = validate_environment(environment)
        if client_info.current.environment == environment:
            return environment

        if not (interpreter := client_info.interpreters.get(environment)):
            # Check if client and total process limits are met to create new interpreter
            if (
                len(client_info.interpreters) < self.interpreters_per_client
                and self.total_active_processes < self.max_interpreters
            ):
                interpreter = _InterpreterProcess(environment)
                client_info.interpreters[environment] = interpreter
            else:
                raise Exception("Unable to create new interpreter due to process limit.")

        self._clients[client_id].current = CurrentContext(environment=environment, interpreter=interpreter)
        return environment

Request Worker

Request worker thread to handle incoming requests from clients.

Source code in cillow/server/request_worker.py
class RequestWorker(Thread, Logger):
    """Request worker thread to handle incoming requests from clients."""

    def __init__(
        self,
        queue: Queue,
        client_manager: ClientManager,
        callback: WriteCallback,
        stop_event: ThreadEvent,
    ) -> None:
        """
        Initialize the worker thread.

        Args:
            queue: The queue to receive requests from
            client_manager: The client manager instance to use for processing client requests
            callback: The callback to write responses to
            stop_event: The event to stop the worker thread
        """
        self._queue = queue
        self._client_manager = client_manager
        self._stop_event = stop_event
        self._callback = callback
        super().__init__(daemon=True)

    def _get_python_environment(self, client_id: bytes, type: Literal["current", "all", "default"]) -> None:  # noqa: A002
        """
        Get client's python environment of certain type.

        Args:
            client_id: The client id
            type: The type of python environment to get
        """
        if (client_info := self._client_manager.get_info(client_id.decode())) is None:
            return

        if type == "all":
            self._callback(client_id, b"request_done", pickle.dumps(list(client_info.interpreters)))
        elif type == "current":
            self._callback(client_id, b"request_done", pickle.dumps(client_info.current.environment))
        elif type == "default":
            self._callback(client_id, b"request_done", pickle.dumps(client_info.default_environment))

    def _modify_interpreter(
        self, client_id: bytes, environment: PythonEnvironment | str, mode: Literal["switch", "delete"]
    ) -> None:
        """
        Modify the client's interpreter based on the given environment and mode.

        Args:
            client_id: The client id
            environment: The environment to use
            mode: The mode to use
        """
        client_id_str = client_id.decode()
        # fmt: off
        _switch = lambda env: self._callback(
            client_id,
            b"request_done",
            pickle.dumps(self._client_manager.switch_interpreter(client_id_str, env))
        )
        # fmt: on
        try:
            if mode == "switch":
                _switch(environment)
            elif mode == "delete":
                self._client_manager.delete_interpreter(client_id_str, environment)
                _switch(self._client_manager.get_info(client_id_str).default_environment)  # type: ignore[union-attr]

        except Exception as e:
            print(str(e))
            self._callback(client_id, b"request_exception", str(e).encode())

    def _send_input_to_interpreter(self, client_id: bytes, **kwargs: Any) -> None:
        """
        Send input to interpreter.

        Args:
            client_id: The client id
            **kwargs: The input data
        """
        if (client_info := self._client_manager.get_info(client_id.decode())) is None:
            return

        for response in client_info.current.interpreter._send_input(**kwargs):
            self._callback(client_id, b"interpreter", pickle.dumps(response))

        # Tell client to not wait for more responses
        self._callback(client_id, b"request_done", b"")

    def run(self) -> None:
        """Run the worker thread."""
        while not self._stop_event.is_set():
            try:
                client_id, request_bytes = cast(tuple[bytes, bytes], self._queue.get(timeout=1))
                request = pickle.loads(request_bytes)
                try:
                    client_id_str = client_id.decode()
                    if isinstance(request, ModifyInterpreter):
                        # Register with default environment
                        self._client_manager.register(client_id_str, request.environment)
                    else:
                        self._client_manager.register(client_id_str)

                except Exception as e:
                    self._callback(client_id, b"request_exception", str(e).encode())
                    continue

                if isinstance(request, GetPythonEnvironment):
                    self._get_python_environment(client_id, request.type)
                elif isinstance(request, ModifyInterpreter):
                    self._modify_interpreter(client_id, request.environment, request.mode)
                elif isinstance(request, (SetEnvironmentVariables, InstallRequirements, RunCode, RunCommand)):
                    self._send_input_to_interpreter(client_id, **request.__dict__)
                elif isinstance(request, Disconnect):
                    self._client_manager.remove(client_id.decode())
                    self._callback(client_id, b"request_done", b"")
            except QueueEmptyError:
                continue
            except Exception as e:
                self.logger.error(f"{e.__class__.__name__}: {e!s}")

run

Run the worker thread.

Source code in cillow/server/request_worker.py
def run(self) -> None:
    """Run the worker thread."""
    while not self._stop_event.is_set():
        try:
            client_id, request_bytes = cast(tuple[bytes, bytes], self._queue.get(timeout=1))
            request = pickle.loads(request_bytes)
            try:
                client_id_str = client_id.decode()
                if isinstance(request, ModifyInterpreter):
                    # Register with default environment
                    self._client_manager.register(client_id_str, request.environment)
                else:
                    self._client_manager.register(client_id_str)

            except Exception as e:
                self._callback(client_id, b"request_exception", str(e).encode())
                continue

            if isinstance(request, GetPythonEnvironment):
                self._get_python_environment(client_id, request.type)
            elif isinstance(request, ModifyInterpreter):
                self._modify_interpreter(client_id, request.environment, request.mode)
            elif isinstance(request, (SetEnvironmentVariables, InstallRequirements, RunCode, RunCommand)):
                self._send_input_to_interpreter(client_id, **request.__dict__)
            elif isinstance(request, Disconnect):
                self._client_manager.remove(client_id.decode())
                self._callback(client_id, b"request_done", b"")
        except QueueEmptyError:
            continue
        except Exception as e:
            self.logger.error(f"{e.__class__.__name__}: {e!s}")