Source code for tuxemon.animation

# SPDX-License-Identifier: GPL-3.0
# Copyright (c) 2014-2023 William Edwards <shadowapex@gmail.com>, Benjamin Bean <superman2k5@gmail.com>
from __future__ import annotations

import logging
from collections import defaultdict
from math import cos, pi, sin, sqrt
from typing import (
    Any,
    Callable,
    DefaultDict,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import pygame

__all__ = ("Task", "Animation", "remove_animations_of")

from tuxemon.compat import Rect

ScheduledFunction = Callable[[], Any]

logger = logging.getLogger(__name__)

ANIMATION_NOT_STARTED = 0
ANIMATION_RUNNING = 1
ANIMATION_DELAYED = 2
ANIMATION_FINISHED = 3


def check_number(value: Any) -> float:
    """
    Test if an object is a number.

    Raises ``ValueError`` when ``value`` is not a number.

    Parameters:
        value: Some object.

    """
    try:
        return float(value)
    except (ValueError, TypeError):
        raise ValueError


[docs]def remove_animations_of(target: object, group: pygame.sprite.Group) -> None: """ Find animations that target objects and remove those animations. Parameters: target: Object whose animations should be removed. group: Pygame group where to remove the animations. """ animations = [ani for ani in group.sprites() if isinstance(ani, Animation)] to_remove = [ ani for ani in animations if target in [i[0] for i in ani.targets] ] group.remove(*to_remove)
class TaskBase(pygame.sprite.Sprite): _valid_schedules: Sequence[str] = [] def __init__(self) -> None: super().__init__() self._callbacks: DefaultDict[ str, List[ScheduledFunction], ] = defaultdict(list) def schedule( self, func: ScheduledFunction, when: Optional[str] = None, ) -> None: """ Schedule a callback during operation of Task or Animation. The callback is any callable object. You can specify different times for the callback to be executed, according to the following: * "on update": called each time the Task/Animation is updated. * "on finish": called when the Task/Animation completes normally. * "on abort": called if the Task/Animation is aborted. If when is not passed, it will be "on finish": Parameters: func: Callable to schedule. when: Time when ``func`` is going to be called. """ if when is None: when = self._valid_schedules[0] if when not in self._valid_schedules: raise ValueError( "invalid time to schedule a callback" f"valid: {self._valid_schedules}" ) self._callbacks[when].append(func) def _execute_callbacks(self, when: str) -> None: try: callbacks = self._callbacks[when] except KeyError: return else: [cb() for cb in callbacks]
[docs]class Task(TaskBase): """ Execute functions at a later time and optionally loop it. This is a silly little class meant to make it easy to create delayed or looping events without any complicated hooks into pygame's clock or event loop. Tasks are created and must be added to a normal pygame group in order to function. This group must be updated, but not drawn. Setting the interval to 0 cause the callback to be called on the next update. Because the pygame clock returns milliseconds, the examples below use milliseconds. However, you are free to use whatever time unit you wish, as long as it is used consistently. Parameters: callback: Function to execute each interval. interval: Time between callbacks. times: Number of intervals. Examples: >>> task_group = pygame.sprite.Group() >>> # like a delay >>> def call_later(): ... pass >>> task = Task(call_later, 1000) >>> task_group.add(task) >>> # do something 24 times at 1 second intervals >>> task = Task(call_later, 1000, 24) >>> # do something every 2.5 seconds forever >>> task = Task(call_later, 2500, -1) >>> # pass arguments using functools.partial >>> from functools import partial >>> task = Task(partial(call_later(1,2,3, key=value)), 1000) >>> # a task must have at lease on callback, but others can be added >>> task = Task(call_later, 2500, -1) >>> task.schedule(some_thing_else) >>> # chain tasks: when one task finishes, start another one >>> task = Task(call_later, 2500) >>> task.chain(Task(something_else)) When chaining tasks, do not add the chained tasks to a group. """ _valid_schedules = ("on interval", "on finish", "on abort") def __init__( self, callback: ScheduledFunction, interval: float = 0, times: int = 1, ) -> None: if not callable(callback): raise ValueError if times == 0: raise ValueError super().__init__() self._interval = interval self._loops = times self._duration: float = 0 self._chain: List[Task] = list() self._state = ANIMATION_RUNNING self.schedule(callback)
[docs] def chain( self, callback: ScheduledFunction, interval: float = 0, times: int = 1, ) -> None: """ Schedule a callback to execute when this one is finished If you attempt to chain a task to a task that will never end, RuntimeError will be raised. This is convenience to make a new Task and set to it to be added to the "on_finish" list. Parameters: callback: Function to execute each interval. interval: Time between callbacks. times: Number of intervals. """ task = Task(callback, interval, times) self.chain_task(task)
[docs] def chain_task(self, *others: Task) -> Sequence[Task]: """ Schedule Task(s) to execute when this one is finished. If you attempt to chain a task to a task that will never end, RuntimeError will be raised. Parameters: others: Task instances. Returns: The sequence of added Tasks. """ if self._loops <= -1: raise RuntimeError for task in others: if not isinstance(task, Task): raise TypeError self._chain.append(task) return others
[docs] def update(self, dt: float) -> None: """ Update the Task. The unit of time passed must match the one used in the constructor. Task will not 'make up for lost time'. If an interval was skipped because of a lagging clock, then callbacks will not be made to account for the missed ones. Parameters: dt: Time passed since last update. """ if self._state is not ANIMATION_RUNNING: raise RuntimeError self._duration += dt if self._duration >= self._interval: self._duration -= self._interval if self._loops >= 0: self._loops -= 1 if self._loops == 0: # loops counter is zero, finish now self.finish() else: # not finished, but still are iterations left self._execute_callbacks("on interval") else: # loops == -1, run forever self._execute_callbacks("on interval")
[docs] def finish(self) -> None: """Force task to finish, while executing callbacks.""" if self._state is ANIMATION_RUNNING: self._state = ANIMATION_FINISHED self._execute_callbacks("on interval") self._execute_callbacks("on finish") self._execute_chain() self._cleanup()
[docs] def is_finish(self) -> bool: """ Returns: Whether the task is finished or not. """ return self._state is ANIMATION_FINISHED
[docs] def reset_delay(self, new_delay: float) -> None: """ Reset the delay before starting task to make sure time left is equal or bigger to the provided value Parameters: new_delay: the updated delay that should be respected """ time_left = self._interval - self._duration if new_delay > time_left: self._interval = new_delay self._duration = 0
[docs] def abort(self) -> None: """Force task to finish, without executing callbacks.""" self._state = ANIMATION_FINISHED self._cleanup()
def _cleanup(self) -> None: self._chain = [] self.kill() def _execute_chain(self) -> None: groups = self.groups() for task in self._chain: task.add(*groups)
[docs]class Animation(pygame.sprite.Sprite): """ Change numeric values over time. To animate a target sprite/object's position, simply specify the target x/y values where you want the widget positioned at the end of the animation. Then call start while passing the target as the only parameter. >>> ani = Animation(x=100, y=100, duration=1000) >>> ani.start(sprite) The shorthand method of starting animations is to pass the targets as positional arguments in the constructor. >>> ani = Animation(sprite.rect, x=100, y=0) If you would rather specify relative values, then pass the relative keyword and the values will be adjusted for you: >>> ani = Animation(x=100, y=100, duration=1000) >>> ani.start(sprite, relative=True) You can also specify a callback that will be executed when the animation finishes: >>> ani.callback = my_function Another optional callback is available that is called after each update: >>> ani.update_callback = post_update_function Animations must be added to a sprite group in order for them to be updated. If the sprite group that contains them is drawn then an exception will be raised, so you should create a sprite group only for containing Animations. You can cancel the animation by calling ``Animation.abort()``. When the animation has finished, then it will remove itself from the sprite group that contains it. You can optionally delay the start of the animation using the delay keyword. **Callable Attributes** Target values can also be callable. In this case, there is no way to determine the initial value unless it is specified in the constructor. If no initial value is specified, it will default to 0. Like target arguments, the initial value can also refer to a callable. NOTE: Specifying an initial value will set the initial value for all target names in the constructor. This limitation won't be resolved for a while. **Pygame Rects** The 'round_values' parameter will be set to True automatically if pygame rects are used as an animation target. Parameters: targets: Any valid python objects. delay: Delay time before the animation starts. round_values: Wether the values must be rounded to the nearest integer before being set. duration: Time duration of the animation. transition: Transition to use in the animation. Can be the name of a method of :class:`AnimationTransition` or a callable with the same signature. initial: Initial value. Can be numeric or a callable that returns a numeric value. If ``None`` the value itself is used. relative: If the values are relative to the initial value. That is, in order to find the actual value one has to add the initial one. kwargs: Properties of the ``targets`` to be used, and their values. """ default_duration = 1000.0 default_transition = "linear" def __init__( self, *targets: object, delay: float = 0, round_values: bool = False, duration: Optional[float] = None, transition: Union[str, Callable[[float], float], None] = None, initial: Union[float, Callable[[], float], None] = None, relative: bool = False, **kwargs: Any, ) -> None: super().__init__() self.callback: Callable[[], Any] self.update_callback: Callable[[], Any] self.targets: List[ Tuple[object, Mapping[str, Tuple[float, float]]] ] = list() self._targets: Sequence[object] = list() self.delay = delay self._state = ANIMATION_NOT_STARTED self._round_values = round_values self._duration = ( self.default_duration if duration is None else duration ) if transition is None: transition = self.default_transition if isinstance(transition, str): transition = getattr(AnimationTransition, transition) assert callable(transition) self._transition = transition self._initial = initial self._relative = relative self._elapsed = 0.0 if not kwargs: raise ValueError self.props = kwargs if targets: self.start(*targets) def _get_value(self, target: object, name: str) -> float: """ Get value of an attribute, even if it is a callable. Parameters: target: Object that contains the attribute. name: Name of the attribute to get the value from. Returns: Attribute value. """ if self._initial is None: value = getattr(target, name) else: value = self._initial if callable(value): value = value() return check_number(value) def _set_value(self, target: object, name: str, value: float) -> None: """ Set a value on some other object. If the name references a callable type, then the object of that name will be called with 'value' as the first and only argument. Because callables are 'write only', there is no way to determine the initial value. you can supply an initial value in the constructor as a value or reference to a callable object. Parameters: target: Object to be modified. name: Name of attribute to be modified. value: New value of the attribute. """ if self._round_values: value = int(round(value, 0)) attr = getattr(target, name) if callable(attr): attr(value) else: setattr(target, name, value)
[docs] def update(self, dt: float) -> None: """ Update the animation. The unit of time passed must match the one used in the constructor. Make sure that you start the animation, otherwise your animation will not be changed during update(). Will raise RuntimeError if animation is updated after it has finished. Parameters: dt: Time passed since last update. """ if self._state is ANIMATION_FINISHED: return # raise RuntimeError if self._state is not ANIMATION_RUNNING: return self._elapsed += dt if self.delay > 0: if self._elapsed > self.delay: self._elapsed -= self.delay self._gather_initial_values() self.delay = 0 return p = min(1.0, self._elapsed / self._duration) t = self._transition(p) for target, props in self.targets: for name, values in props.items(): a, b = values value = (a * (1.0 - t)) + (b * t) self._set_value(target, name, value) if hasattr(self, "update_callback"): self.update_callback() if p >= 1: self.finish()
[docs] def finish(self) -> None: """ Force animation to finish, apply transforms, and execute callbacks. * Update callback will be called because the value is changed. * Final callback ('callback') will be called. * Final values will be applied. * Animation will be removed from group. """ # if self._state is not ANIMATION_RUNNING: # raise RuntimeError if self.targets is not None: for target, props in self.targets: for name, values in props.items(): a, b = values self._set_value(target, name, b) if hasattr(self, "update_callback"): self.update_callback() self.abort()
[docs] def abort(self) -> None: """ Force animation to finish, without any cleanup. * Update callback will not be executed. * Final callback will be executed. * Values will not change. * Animation will be removed from group. """ # if self._state is not ANIMATION_RUNNING: # raise RuntimeError self._state = ANIMATION_FINISHED self.targets = [] self.kill() if hasattr(self, "callback"): self.callback()
[docs] def start(self, *targets: object, **kwargs: Any) -> None: """ Start the animation on a target sprite/object. Targets must have the attributes that were set when this animation was created. Parameters: targets: Any valid python objects. kwargs: Ignored. Raises: RuntimeError: If the animation is already started. """ # TODO: weakref the targets if self._state is not ANIMATION_NOT_STARTED: raise RuntimeError self._state = ANIMATION_RUNNING self._targets = targets if self.delay == 0: self._gather_initial_values()
def _gather_initial_values(self) -> None: self.targets = list() for target in self._targets: props = dict() if isinstance(target, Rect): self._round_values = True for name, value in self.props.items(): initial = self._get_value(target, name) check_number(initial) check_number(value) if self._relative: value += initial props[name] = initial, value self.targets.append((target, props)) self.update(0)
class AnimationTransition: """ Collection of animation functions to be used with the Animation object. Easing Functions ported to Kivy from the Clutter Project http://www.clutter-project.org/docs/clutter/stable/ClutterAlpha.html The `progress` parameter in each animation function is in the range 0-1. """ @staticmethod def linear(progress: float) -> float: return progress @staticmethod def in_quad(progress: float) -> float: return progress * progress @staticmethod def out_quad(progress: float) -> float: return -1.0 * progress * (progress - 2.0) @staticmethod def in_out_quad(progress: float) -> float: p = progress * 2 if p < 1: return 0.5 * p * p p -= 1.0 return -0.5 * (p * (p - 2.0) - 1.0) @staticmethod def in_cubic(progress: float) -> float: return progress * progress * progress @staticmethod def out_cubic(progress: float) -> float: p = progress - 1.0 return p * p * p + 1.0 @staticmethod def in_out_cubic(progress: float) -> float: p = progress * 2 if p < 1: return 0.5 * p * p * p p -= 2 return 0.5 * (p * p * p + 2.0) @staticmethod def in_quart(progress: float) -> float: return progress * progress * progress * progress @staticmethod def out_quart(progress: float) -> float: p = progress - 1.0 return -1.0 * (p * p * p * p - 1.0) @staticmethod def in_out_quart(progress: float) -> float: p = progress * 2 if p < 1: return 0.5 * p * p * p * p p -= 2 return -0.5 * (p * p * p * p - 2.0) @staticmethod def in_quint(progress: float) -> float: return progress * progress * progress * progress * progress @staticmethod def out_quint(progress: float) -> float: p = progress - 1.0 return p * p * p * p * p + 1.0 @staticmethod def in_out_quint(progress: float) -> float: p = progress * 2 if p < 1: return 0.5 * p * p * p * p * p p -= 2.0 return 0.5 * (p * p * p * p * p + 2.0) @staticmethod def in_sine(progress: float) -> float: return -1.0 * cos(progress * (pi / 2.0)) + 1.0 @staticmethod def out_sine(progress: float) -> float: return sin(progress * (pi / 2.0)) @staticmethod def in_out_sine(progress: float) -> float: return -0.5 * (cos(pi * progress) - 1.0) @staticmethod def in_expo(progress: float) -> float: if progress == 0: return 0.0 value = pow(2, 10 * (progress - 1.0)) return float(value) @staticmethod def out_expo(progress: float) -> float: if progress == 1.0: return 1.0 value = -pow(2, -10 * progress) + 1.0 return float(value) @staticmethod def in_out_expo(progress: float) -> float: if progress == 0: return 0.0 if progress == 1.0: return 1.0 p = progress * 2 if p < 1: value = 0.5 * pow(2, 10 * (p - 1.0)) return float(value) p -= 1.0 value = 0.5 * (-pow(2, -10 * p) + 2.0) return float(value) @staticmethod def in_circ(progress: float) -> float: return -1.0 * (sqrt(1.0 - progress * progress) - 1.0) @staticmethod def out_circ(progress: float) -> float: p = progress - 1.0 return sqrt(1.0 - p * p) @staticmethod def in_out_circ(progress: float) -> float: p = progress * 2 if p < 1: return -0.5 * (sqrt(1.0 - p * p) - 1.0) p -= 2.0 return 0.5 * (sqrt(1.0 - p * p) + 1.0) @staticmethod def in_elastic(progress: float) -> float: p = 0.3 s = p / 4.0 q = progress if q == 1: return 1.0 q -= 1.0 value = -(pow(2, 10 * q) * sin((q - s) * (2 * pi) / p)) return float(value) @staticmethod def out_elastic(progress: float) -> float: p = 0.3 s = p / 4.0 q = progress if q == 1: return 1.0 value = pow(2, -10 * q) * sin((q - s) * (2 * pi) / p) + 1.0 return float(value) @staticmethod def in_out_elastic(progress: float) -> float: p = 0.3 * 1.5 s = p / 4.0 q = progress * 2 if q == 2: return 1.0 if q < 1: q -= 1.0 value = -0.5 * (pow(2, 10 * q) * sin((q - s) * (2.0 * pi) / p)) return float(value) else: q -= 1.0 value = pow(2, -10 * q) * sin((q - s) * (2.0 * pi) / p) * 0.5 + 1.0 return float(value) @staticmethod def in_back(progress: float) -> float: return progress * progress * ((1.70158 + 1.0) * progress - 1.70158) @staticmethod def out_back(progress: float) -> float: p = progress - 1.0 return p * p * ((1.70158 + 1) * p + 1.70158) + 1.0 @staticmethod def in_out_back(progress: float) -> float: p = progress * 2.0 s = 1.70158 * 1.525 if p < 1: return 0.5 * (p * p * ((s + 1.0) * p - s)) p -= 2.0 return 0.5 * (p * p * ((s + 1.0) * p + s) + 2.0) @staticmethod def _out_bounce_internal(t: float, d: float) -> float: p = t / d if p < (1.0 / 2.75): return 7.5625 * p * p elif p < (2.0 / 2.75): p -= 1.5 / 2.75 return 7.5625 * p * p + 0.75 elif p < (2.5 / 2.75): p -= 2.25 / 2.75 return 7.5625 * p * p + 0.9375 else: p -= 2.625 / 2.75 return 7.5625 * p * p + 0.984375 @staticmethod def _in_bounce_internal(t: float, d: float) -> float: return 1.0 - AnimationTransition._out_bounce_internal(d - t, d) @staticmethod def in_bounce(progress: float) -> float: return AnimationTransition._in_bounce_internal(progress, 1.0) @staticmethod def out_bounce(progress: float) -> float: return AnimationTransition._out_bounce_internal(progress, 1.0) @staticmethod def in_out_bounce(progress: float) -> float: p = progress * 2.0 if p < 1.0: return AnimationTransition._in_bounce_internal(p, 1.0) * 0.5 return ( AnimationTransition._out_bounce_internal(p - 1.0, 1.0) * 0.5 + 0.5 )