Source code for graphviz.backend.execute

"""Run subprocesses with ``subprocess.run()`` and ``subprocess.Popen()``."""

import errno
import logging
import os
import subprocess
import sys
import typing

from .. import _compat

__all__ = ['run_check', 'ExecutableNotFound', 'CalledProcessError']


log = logging.getLogger(__name__)


BytesOrStrIterator = typing.Union[typing.Iterator[bytes],
                                  typing.Iterator[str]]


@typing.overload
def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[typing.Iterator[bytes]] = ...,
              encoding: None = ...,
              quiet: bool = ...,
              **kwargs) -> subprocess.CompletedProcess:
    """Accept bytes input_lines with default ``encoding=None```."""


@typing.overload
def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[typing.Iterator[str]] = ...,
              encoding: str,
              quiet: bool = ...,
              **kwargs) -> subprocess.CompletedProcess:
    """Accept string input_lines when given ``encoding``."""


@typing.overload
def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[BytesOrStrIterator] = ...,
              encoding: typing.Optional[str] = ...,
              capture_output: bool = ...,
              quiet: bool = ...,
              **kwargs) -> subprocess.CompletedProcess:
    """Accept bytes or string input_lines depending on ``encoding``."""


def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[BytesOrStrIterator] = None,
              encoding: typing.Optional[str] = None,
              quiet: bool = False,
              **kwargs) -> subprocess.CompletedProcess:
    """Run the command described by ``cmd``
        with ``check=True`` and return its completed process.

    Raises:
        CalledProcessError: if the returncode of the subprocess is non-zero.
    """
    log.debug('run %r', cmd)
    if not kwargs.pop('check', True):  # pragma: no cover
        raise NotImplementedError('check must be True or omited')

    if encoding is not None:
        kwargs['encoding'] = encoding

    kwargs.setdefault('startupinfo', _compat.get_startupinfo())

    try:
        if input_lines is not None:
            assert kwargs.get('input') is None
            assert iter(input_lines) is input_lines
            if kwargs.pop('capture_output'):
                kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
            proc = _run_input_lines(cmd, input_lines, kwargs=kwargs)
        else:
            proc = subprocess.run(cmd, **kwargs)
    except OSError as e:
        if e.errno == errno.ENOENT:
            raise ExecutableNotFound(cmd) from e
        raise

    if not quiet and proc.stderr:
        _write_stderr(proc.stderr)

    try:
        proc.check_returncode()
    except subprocess.CalledProcessError as e:
        raise CalledProcessError(*e.args)

    return proc


def _run_input_lines(cmd, input_lines, *, kwargs):
    popen = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs)

    stdin_write = popen.stdin.write
    for line in input_lines:
        stdin_write(line)

    stdout, stderr = popen.communicate()
    return subprocess.CompletedProcess(popen.args, popen.returncode,
                                       stdout=stdout, stderr=stderr)


def _write_stderr(stderr) -> None:
    if isinstance(stderr, bytes):
        stderr_encoding = (getattr(sys.stderr, 'encoding', None)
                           or sys.getdefaultencoding())
        stderr = stderr.decode(stderr_encoding)

    sys.stderr.write(stderr)
    sys.stderr.flush()
    return None


[docs]class ExecutableNotFound(RuntimeError): """:exc:`RuntimeError` raised if the Graphviz executable is not found.""" _msg = ('failed to execute {!r}, ' 'make sure the Graphviz executables are on your systems\' PATH') def __init__(self, args) -> None: super().__init__(self._msg.format(*args))
[docs]class CalledProcessError(subprocess.CalledProcessError): """:exc:`~subprocess.CalledProcessError` raised if a subprocess ``returncode`` is not ``0``.""" # noqa: E501 def __str__(self) -> 'str': return f'{super().__str__()} [stderr: {self.stderr!r}]'