Source code for clanimtk.core

# -*- coding: utf-8 -*-
"""Core functionality for clanimtk. This module is only intended to be used as
an internal part of clanimtk. The relevant, public functionality in this module
is exposed in the :py:module:: clanimtk.decorator module.

.. module:: core
    :synopsis: Core functionality for clanimtk.
.. moduleauthor:: Simon Larsén <slarse@kth.se>
"""
import asyncio
import functools
import itertools
import sys
from typing import Optional

from clanimtk import util
from clanimtk.cli import BACKLINE, BACKSPACE

ANNOTATED = '_clanimtk_annotated'


[docs]class Animate: """A wrapper class for adding a CLI animation to a slow-running function. Animate uses introspection to figure out if the function it decorates is synchronous (defined with 'def') or asynchronous (defined with 'async def'), and works with both. .. DANGER:: This class is not intended to be used directly, but rather through the animate function. """ def __init__(self, func=None, *, animation_gen, step=.1): """Constructor. Args: func: If Animate is used without kwargs, then the function it decorates is passed in here. Otherwise, this is None. This argument should NOT be given directly via keyword assignment. animation_gen: A generator that yields strings for the animation. step: Seconds between each animation frame. """ if not callable(func): raise TypeError("argument 'func' for {!r} must be " "callable".format(self.__class__.__name__)) self._raise_if_annotated(func) self._func = func self._animation_gen = animation_gen self._step = step functools.update_wrapper(self, func) def __call__(self, *args, **kwargs): """Make the class instance callable. func (function): If the """ supervisor = util.get_supervisor(self._func) return supervisor(self._animation_gen, self._step, *args, **kwargs) def _raise_if_annotated(self, func): """Raise TypeError if a function is decorated with Annotate, as such functions cause visual bugs when decorated with Animate. Animate should be wrapped by Annotate instead. Args: func (function): Any callable. Raises: TypeError """ if hasattr(func, ANNOTATED) and getattr(func, ANNOTATED): msg = ('Functions decorated with {!r} ' 'should not be decorated with {!r}.\n' 'Please reverse the order of the decorators!'.format( self.__class__.__name__, Annotate.__name__)) raise TypeError(msg)
[docs]class Annotate: """A decorator meant for decorating functions that are decorated with the animation decorator. It prints a message to stdout before and/or after the function has finished. .. DANGER:: This decorator can also be used standalone, but you should NOT decorate a function that is decorated with Annotate with Animate. That is to say, the decorator order must be like this: .. code-block:: python @Annotate @Animate def some_function() pass """ def __init__(self, *, start_msg: Optional[str] = None, end_msg: Optional[str] = None, start_no_nl: bool = False): """Note that both arguments are keyword only arguments. Args: start_msg: A message to print before the function runs. end_msg: A message to print after the function has finished. start_no_nl: If True, no newline is appended after the start_msg. """ if start_msg is None and end_msg is None: raise ValueError( "At least one of 'start_msg' and 'end_msg' must be specified.") self._raise_if_not_none_nor_string(start_msg, "start_msg") self._raise_if_not_none_nor_string(end_msg, "end_msg") self._start_msg = start_msg self._end_msg = end_msg self._start_no_nl = start_no_nl def _raise_if_not_none_nor_string(self, msg, parameter_name): if msg is not None and not isinstance(msg, str): raise TypeError(f"Bad operand type for {self.__class__.__name__!r}" f".{parameter_name}: {type(msg)}") def _start_print(self): """Print the start message with or without newline depending on the self._start_no_nl variable. """ if self._start_no_nl: sys.stdout.write(self._start_msg) sys.stdout.flush() else: print(self._start_msg) def __call__(self, func, *args, **kwargs): """ Args: func: The annotated function. args: Arguments for func. kwargs: Keyword arguments for func. """ if asyncio.iscoroutinefunction(func): return self._async_call(func, *args, **kwargs) return self._sync_call(func, *args, **kwargs) def _sync_call(self, func): """__call__ function for regular synchronous functions. Args: func: The annotated function. args: Arguments for func. kwargs: Keyword arguments for func. """ @functools.wraps(func) def wrapper(*args, **kwargs): if self._start_msg: self._start_print() result = func(*args, **kwargs) if self._end_msg: print(self._end_msg) return result setattr(wrapper, ANNOTATED, True) return wrapper def _async_call(self, func): """__call__ function for asyncio coroutines. Args: func: The annotated function. args: Arguments for func. kwargs: Keyword arguments for func. """ @functools.wraps(func) async def wrapper(*args, **kwargs): if self._start_msg: print(self._start_msg) result = await func(*args, **kwargs) if self._end_msg: print(self._end_msg) return result setattr(wrapper, ANNOTATED, True) return wrapper
[docs]class Animation: """A wrapper class for FrameFunctions. It automatically backs up the cursor after each frame, and provides reset and erase functionality. .. DANGER:: Do not use directly, use the animation function instead. """ def __init__(self, frame_function, current_generator=None, back_up_generator=None, animation_args=None, animation_kwargs=None): self._frame_function = frame_function self._current_generator = current_generator self._back_up_generator = back_up_generator self._animation_args = animation_args self._animation_kwargs = animation_kwargs self._current_frame = ""
[docs] def reset(self): """Reset the current animation generator.""" animation_gen = self._frame_function(*self._animation_args, **self._animation_kwargs) self._current_generator = itertools.cycle( util.concatechain(animation_gen, self._back_up_generator))
[docs] def get_erase_frame(self): """Return a frame that completely erases the current frame, and then backs up. Assumes that the current frame is of constant width.""" lines = self._current_frame.split('\n') width = len(lines[0]) height = len(lines) line = ' ' * width if height == 1: frame = line + BACKSPACE * width else: frame = '\n'.join([line] * height) + BACKLINE * (height - 1) return frame
def __next__(self): self._current_frame = next(self._current_generator) return self._current_frame def __call__(self, *args, **kwargs): cls = self.__class__ self._animation_args = args self._animation_kwargs = kwargs self._back_up_generator = _get_back_up_generator( self._frame_function, *args, **kwargs) self.reset() return cls(self._frame_function, self._current_generator, self._back_up_generator, args, kwargs) def __iter__(self): return iter(self._current_generator)
def _get_back_up_generator(frame_function, *args, **kwargs): """Create a generator for the provided animation function that backs up the cursor after a frame. Assumes that the animation function provides a generator that yields strings of constant width and height. Args: frame_function: A function that returns a FrameGenerator. args: Arguments for frame_function. kwargs: Keyword arguments for frame_function. Returns: a generator that generates backspace/backline characters for the animation func generator. """ lines = next(frame_function(*args, **kwargs)).split('\n') width = len(lines[0]) height = len(lines) if height == 1: return util.BACKSPACE_GEN(width) return util.BACKLINE_GEN(height) def _backspaced_single_line_animation(animation_, *args, **kwargs): """Turn an animation into an automatically backspaced animation. Args: animation: A function that returns a generator that yields strings for animation frames. args: Arguments for the animation function. kwargs: Keyword arguments for the animation function. Returns: the animation generator, with backspaces applied to each but the first frame. """ animation_gen = animation_(*args, **kwargs) yield next(animation_gen) # no backing up on the first frame yield from util.concatechain( util.BACKSPACE_GEN(kwargs['width']), animation_gen)