"""Sphinx test fixtures for pytest"""

import subprocess
import sys
from collections import namedtuple
from io import StringIO
from subprocess import PIPE
from typing import Any, Callable, Dict, Generator, Tuple

import pytest

from sphinx.testing import util
from sphinx.testing.util import SphinxTestApp, SphinxTestAppWrapperForSkipBuilding

DEFAULT_ENABLED_MARKERS = [
    (
        'sphinx(builder, testroot=None, freshenv=False, confoverrides=None, tags=None,'
        ' docutilsconf=None, parallel=0): arguments to initialize the sphinx test application.'
    ),
    'test_params(shared_result=...): test parameters.',
]


def pytest_configure(config):
    """Register custom markers"""
    for marker in DEFAULT_ENABLED_MARKERS:
        config.addinivalue_line('markers', marker)


@pytest.fixture(scope='session')
def rootdir() -> str:
    return None


class SharedResult:
    cache: Dict[str, Dict[str, str]] = {}

    def store(self, key: str, app_: SphinxTestApp) -> Any:
        if key in self.cache:
            return
        data = {
            'status': app_._status.getvalue(),
            'warning': app_._warning.getvalue(),
        }
        self.cache[key] = data

    def restore(self, key: str) -> Dict[str, StringIO]:
        if key not in self.cache:
            return {}
        data = self.cache[key]
        return {
            'status': StringIO(data['status']),
            'warning': StringIO(data['warning']),
        }


@pytest.fixture
def app_params(request: Any, test_params: Dict, shared_result: SharedResult,
               sphinx_test_tempdir: str, rootdir: str) -> Tuple[Dict, Dict]:
    """
    Parameters that are specified by 'pytest.mark.sphinx' for
    sphinx.application.Sphinx initialization
    """

    # ##### process pytest.mark.sphinx

    if hasattr(request.node, 'iter_markers'):  # pytest-3.6.0 or newer
        markers = request.node.iter_markers("sphinx")
    else:
        markers = request.node.get_marker("sphinx")
    pargs = {}
    kwargs: Dict[str, Any] = {}

    if markers is not None:
        # to avoid stacking positional args
        for info in reversed(list(markers)):
            for i, a in enumerate(info.args):
                pargs[i] = a
            kwargs.update(info.kwargs)

    args = [pargs[i] for i in sorted(pargs.keys())]

    # ##### process pytest.mark.test_params
    if test_params['shared_result']:
        if 'srcdir' in kwargs:
            raise pytest.Exception('You can not specify shared_result and '
                                   'srcdir in same time.')
        kwargs['srcdir'] = test_params['shared_result']
        restore = shared_result.restore(test_params['shared_result'])
        kwargs.update(restore)

    # ##### prepare Application params

    testroot = kwargs.pop('testroot', 'root')
    kwargs['srcdir'] = srcdir = sphinx_test_tempdir / kwargs.get('srcdir', testroot)

    # special support for sphinx/tests
    if rootdir and not srcdir.exists():
        testroot_path = rootdir / ('test-' + testroot)
        testroot_path.copytree(srcdir)

    return namedtuple('app_params', 'args,kwargs')(args, kwargs)  # type: ignore


@pytest.fixture
def test_params(request: Any) -> Dict:
    """
    Test parameters that are specified by 'pytest.mark.test_params'

    :param Union[str] shared_result:
       If the value is provided, app._status and app._warning objects will be
       shared in the parametrized test functions and/or test functions that
       have same 'shared_result' value.
       **NOTE**: You can not specify both shared_result and srcdir.
    """
    if hasattr(request.node, 'get_closest_marker'):  # pytest-3.6.0 or newer
        env = request.node.get_closest_marker('test_params')
    else:
        env = request.node.get_marker('test_params')
    kwargs = env.kwargs if env else {}
    result = {
        'shared_result': None,
    }
    result.update(kwargs)

    if (result['shared_result'] and not isinstance(result['shared_result'], str)):
        raise pytest.Exception('You can only provide a string type of value '
                               'for "shared_result" ')
    return result


@pytest.fixture(scope='function')
def app(test_params: Dict, app_params: Tuple[Dict, Dict], make_app: Callable,
        shared_result: SharedResult) -> Generator[SphinxTestApp, None, None]:
    """
    Provides the 'sphinx.application.Sphinx' object
    """
    args, kwargs = app_params
    app_ = make_app(*args, **kwargs)
    yield app_

    print('# testroot:', kwargs.get('testroot', 'root'))
    print('# builder:', app_.builder.name)
    print('# srcdir:', app_.srcdir)
    print('# outdir:', app_.outdir)
    print('# status:', '\n' + app_._status.getvalue())
    print('# warning:', '\n' + app_._warning.getvalue())

    if test_params['shared_result']:
        shared_result.store(test_params['shared_result'], app_)


@pytest.fixture(scope='function')
def status(app: SphinxTestApp) -> StringIO:
    """
    Back-compatibility for testing with previous @with_app decorator
    """
    return app._status


@pytest.fixture(scope='function')
def warning(app: SphinxTestApp) -> StringIO:
    """
    Back-compatibility for testing with previous @with_app decorator
    """
    return app._warning


@pytest.fixture()
def make_app(test_params: Dict, monkeypatch: Any) -> Generator[Callable, None, None]:
    """
    Provides make_app function to initialize SphinxTestApp instance.
    if you want to initialize 'app' in your test function. please use this
    instead of using SphinxTestApp class directory.
    """
    monkeypatch.setattr('sphinx.application.abspath', lambda x: x)

    apps = []
    syspath = sys.path[:]

    def make(*args, **kwargs):
        status, warning = StringIO(), StringIO()
        kwargs.setdefault('status', status)
        kwargs.setdefault('warning', warning)
        app_: Any = SphinxTestApp(*args, **kwargs)
        apps.append(app_)
        if test_params['shared_result']:
            app_ = SphinxTestAppWrapperForSkipBuilding(app_)
        return app_
    yield make

    sys.path[:] = syspath
    for app_ in reversed(apps):  # clean up applications from the new ones
        app_.cleanup()


@pytest.fixture
def shared_result() -> SharedResult:
    return SharedResult()


@pytest.fixture(scope='module', autouse=True)
def _shared_result_cache() -> None:
    SharedResult.cache.clear()


@pytest.fixture
def if_graphviz_found(app: SphinxTestApp) -> None:
    """
    The test will be skipped when using 'if_graphviz_found' fixture and graphviz
    dot command is not found.
    """
    graphviz_dot = getattr(app.config, 'graphviz_dot', '')
    try:
        if graphviz_dot:
            subprocess.run([graphviz_dot, '-V'], stdout=PIPE, stderr=PIPE)  # show version
            return
    except OSError:  # No such file or directory
        pass

    pytest.skip('graphviz "dot" is not available')


@pytest.fixture(scope='session')
def sphinx_test_tempdir(tmpdir_factory: Any) -> "util.path":
    """
    Temporary directory wrapped with `path` class.
    """
    tmpdir = tmpdir_factory.getbasetemp()
    return util.path(tmpdir).abspath()


@pytest.fixture
def tempdir(tmpdir: str) -> "util.path":
    """
    Temporary directory wrapped with `path` class.
    This fixture is for back-compatibility with old test implementation.
    """
    return util.path(tmpdir)


@pytest.fixture
def rollback_sysmodules():
    """
    Rollback sys.modules to its value before testing to unload modules
    during tests.

    For example, used in test_ext_autosummary.py to permit unloading the
    target module to clear its cache.
    """
    try:
        sysmodules = list(sys.modules)
        yield
    finally:
        for modname in list(sys.modules):
            if modname not in sysmodules:
                sys.modules.pop(modname)