# SPDX-License-Identifier: GPL-3.0
# Copyright (c) 2014-2023 William Edwards <shadowapex@gmail.com>, Benjamin Bean <superman2k5@gmail.com>
from __future__ import annotations
import inspect
import logging
import os.path
import sys
import warnings
from abc import ABCMeta
from importlib import import_module
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
overload,
)
import pygame
from pygame.rect import Rect
from tuxemon import graphics, prepare
from tuxemon.animation import Animation, Task, remove_animations_of
from tuxemon.constants import paths
from tuxemon.platform.events import PlayerInput
from tuxemon.session import local_session
from tuxemon.sprite import Sprite, SpriteGroup
logger = logging.getLogger(__name__)
StateType = TypeVar("StateType", bound="State")
[docs]class State:
"""This is a prototype class for States.
All states should inherit from it. No direct instances of this
class should be created. Update must be overloaded in the child class.
Overview of Methods:
* resume - Called each time state is updated for first time
* update - Called each frame while state is active
* process_event - Called when there is a new input event
* pause - Called when state is no longer active
* shutdown - Called before state is destroyed
"""
__metaclass__ = ABCMeta
rect = Rect((0, 0), prepare.SCREEN_SIZE)
transparent = False # ignore all background/borders
force_draw = False # draw even if completely under another state
def __init__(self) -> None:
"""
Constructor
Attributes:
force_draw: If True, state will never be skipped in drawing phase.
rect: Area of the screen will be drawn on.
Important! The state must be ready to be drawn after this is called.
"""
self.start_time = 0.0
self.current_time = 0.0
# Only animations and tasks
self.animations = pygame.sprite.Group()
# All sprites that draw on the screen
self.sprites: SpriteGroup[Sprite] = SpriteGroup()
# TODO: fix local session
self.client = local_session.client
@property
def name(self) -> str:
return self.__class__.__name__
[docs] def load_sprite(self, filename: str, **kwargs: Any) -> Sprite:
"""
Load a sprite and add it to this state.
Parameters:
filename: Filename, relative to the resources folder.
kwargs: Keyword arguments to pass to the Rect constructor. Can be
any value used by Rect, or layer.
Returns:
Loaded sprite.
"""
layer = kwargs.pop("layer", 0)
sprite = graphics.load_sprite(filename, **kwargs)
self.sprites.add(sprite, layer=layer)
return sprite
[docs] def animate(self, *targets: Any, **kwargs: Any) -> Animation:
"""
Animate something in this state.
Animations are processed even while state is inactive.
Parameters:
targets: Targets of the Animation.
kwargs: Attributes and their final value.
Returns:
Resulting animation.
"""
ani = Animation(*targets, **kwargs)
self.animations.add(ani)
return ani
[docs] def task(self, *args: Any, **kwargs: Any) -> Task:
"""
Create a task for this state.
Tasks are processed even while state is inactive.
If you want to pass positional arguments, use functools.partial.
Parameters:
args: Function to be called.
kwargs: Keyword parameters passed to the task.
Returns:
The created task.
"""
task = Task(*args, **kwargs)
self.animations.add(task)
return task
[docs] def remove_animations_of(self, target: Any) -> None:
"""
Given and object, remove any animations that it is used with.
Parameters:
target: Object whose animations should be removed.
"""
remove_animations_of(target, self.animations)
[docs] def process_event(self, event: PlayerInput) -> Optional[PlayerInput]:
"""
Handles player input events.
This function is only called when the
player provides input such as pressing a key or clicking the mouse.
Since this is part of a chain of event handlers, the return value
from this method becomes input for the next one. Returning None
signifies that this method has dealt with an event and wants it
exclusively. Return the event and others can use it as well.
You should return None if you have handled input here.
Parameters:
event: Player input event.
Returns:
``None`` if the event should not be passed to the next
handlers. Otherwise, return the input event.
"""
return event
[docs] def update(self, time_delta: float) -> None:
"""
Time update function for state. Must be overloaded in children.
Parameters:
time_delta: Amount of time in fractional seconds since last update.
"""
self.animations.update(time_delta)
self.sprites.update(time_delta)
[docs] def draw(self, surface: pygame.surface.Surface) -> None:
"""
Render the state to the surface passed. Must be overloaded in children.
Do not change the state of any game entities. Every draw should be the
same for a given game time. Any game changes should be done during
update.
Parameters:
surface: Surface to be rendered onto.
"""
[docs] def startup(self, **kwargs: Any) -> None:
"""
DEPRECATED - Use __init__ instead.
Called when scene is added to the state stack.
This will be called:
* after state is pushed and before next update
* just once during the life of a state
Example uses: loading images, configuration, sounds, etc.
Parameters:
kwargs: Configuration options.
"""
[docs] def resume(self) -> None:
"""
Called before update when state is newly in focus.
This will be called:
* after startup and before next update
* after a pop operation which causes this state to be in focus
After being called, state will begin to receive player input.
Could be called several times over lifetime of state.
Example uses: starting music, open menu, starting animations,
timers, etc.
"""
[docs] def pause(self) -> None:
"""
Called when state is pushed back in the stack, allowed to pause.
This will be called:
* after update when state is pushed back
* before being shutdown
After being called, state will no longer receive player input.
Could be called several times over lifetime of state.
Example uses: stopping music, sounds, fading out, making state
graphics dim, etc.
"""
[docs] def shutdown(self) -> None:
"""
Called when state is removed from stack and will be destroyed.
This will be called:
* after update when state is popped
Make sure to release any references to objects that may cause
cyclical dependencies.
"""
[docs]class StateManager:
"""
Allows game states to be managed like a queue.
Parameters:
package: Name of package to search for states.
on_state_change: Optional callback to be executed when top state
changes.
"""
def __init__(
self,
package: str,
on_state_change: Optional[Callable[[], None]] = None,
) -> None:
self.package = package
# TODO: consider API for handling hooks
self._on_state_change_hook = on_state_change
self._state_queue: List[Tuple[str, Mapping[str, Any]]] = list()
self._state_stack: List[State] = list()
self._state_dict: Dict[str, Type[State]] = dict()
self._resume_set: Set[State] = set()
[docs] def auto_state_discovery(self) -> None:
"""
Scan a folder, load states found in it, and register them.
TODO: this functionality duplicates the plugin code.
"""
state_folder = os.path.join(paths.LIBDIR, *self.package.split(".")[1:])
exclude_endings = (".py", ".pyc", ".pyo", "__pycache__")
logger.debug(f"loading game states from {state_folder}")
for folder in os.listdir(state_folder):
if any(folder.endswith(end) for end in exclude_endings):
continue
for state in self.collect_states_from_path(folder):
self.register_state(state)
[docs] def register_state(self, state: Type[State]) -> None:
"""
Add a state class.
Parameters:
state: The state to add.
"""
name = state.__name__
logger.debug(f"loading state: {name}")
self._state_dict[name] = state
def _instance(self, state_name: str, **kwargs: Any) -> State:
"""
Create new instance of State. Builder patter, WIP.
Parameters:
state_name: Name of state to create.
"""
try:
state = self._state_dict[state_name]
except KeyError:
raise RuntimeError(f"Cannot find state: {state_name}")
return state(**kwargs) if kwargs else state()
[docs] @staticmethod
def collect_states_from_module(
import_name: str,
) -> Generator[Type[State], None, None]:
"""
Given a module, return all classes in it that are a game state.
Abstract Base Classes, those whose metaclass is abc.ABCMeta, will
not be included in the state dictionary.
Parameters:
import_name: Name of module
Yields:
Each game state class.
"""
classes = inspect.getmembers(sys.modules[import_name], inspect.isclass)
for c in (i[1] for i in classes):
if issubclass(c, State):
yield c
[docs] def collect_states_from_path(
self,
folder: str,
) -> Generator[Type[State], None, None]:
"""
Load states from disk, but do not register it.
Parameters:
folder: Folder to load from.
Yields:
Each game state class.
"""
try:
import_name = self.package + "." + folder
import_module(import_name)
yield from self.collect_states_from_module(import_name)
except Exception as e:
template = "{} failed to load or is not a valid game package"
logger.error(e)
logger.error(template.format(folder))
raise
[docs] def update(self, time_delta: float) -> None:
"""
Run update on all active states, which doing some internal housekeeping.
WIP. This may change at some point, especially handling of paused
states.
Parameters:
time_delta: Amount of time passed since last frame.
"""
logger.debug("updating states")
for state in self.active_states:
self._check_resume(state)
state.update(time_delta)
def _check_resume(self, state: State) -> None:
"""
Call resume on states that are in the resume set.
Typically states will resume right before an update, but if an update
has not been called before an update, then the resume will be missed.
This is used to enforce the symmetry between resume/pause calls.
Parameters:
state: State to check for resume
"""
if state in self._resume_set:
logger.debug("removing %s from resume set", state.name)
self._resume_set.remove(state)
state.resume()
[docs] def query_all_states(self) -> Mapping[str, Type[State]]:
"""
Return a dictionary of all loaded states.
Keys are state names, values are State classes.
Returns:
Dictionary of all loaded states.
"""
return self._state_dict.copy()
[docs] def queue_state(self, state_name: str, **kwargs: Any) -> None:
"""
Queue a state to be pushed after the top state is popped or replaced.
Use this to chain execution of states, without causing a
state to get instanced before it is on top of the stack.
Parameters:
state_name: Name of state to start.
kwargs: Arguments to pass to the ``__init__`` method of the
new state.
"""
logger.debug("queue state: %s", state_name)
self._state_queue.append((state_name, kwargs))
[docs] def pop_state(self, state: Optional[State] = None) -> None:
"""
Pop some state.
The default state is the current one. The previously running state
will resume, unless there is a queued state, then that state will be
become the new current state, not the previous.
Parameters:
state: The state to remove from stack. Use None (or omit) for
current state.
"""
# handle situation where there is a queued state
if self._state_queue:
state_name, kwargs = self._state_queue.pop(0)
self.replace_state(state_name, **kwargs)
logger.debug("pop state, using queue instead: %s", state_name)
return
# raise error if stack is empty
if not self._state_stack:
raise RuntimeError("Attempted to pop state when stack was empty.")
# pop the top state
if state is None:
state = self._state_stack[0]
try:
index = self._state_stack.index(state)
except IndexError:
raise RuntimeError(
"Attempted to pop state when state was not active.",
)
if index == 0:
logger.debug("pop state: %s", state.name)
self._state_stack.pop(0)
self._check_resume(state)
state.pause()
state.shutdown()
if self._state_stack:
self._resume_set.add(self._state_stack[0])
if self._on_state_change_hook:
self._on_state_change_hook()
else:
logger.debug("pop-remove state: %s", state.name)
self._state_stack.remove(state)
[docs] def remove_state(self, state: State) -> None:
"""
Remove state by passing a reference to it
Parameters:
state: State to remove
"""
try:
index = self._state_stack.index(state)
except IndexError:
logger.critical(
"Attempted to remove state which is not in the stack",
)
raise RuntimeError
if index == 0:
logger.debug("remove-pop state: %s", state.name)
self.pop_state()
else:
logger.debug("remove state: %s", state.name)
self._state_stack.remove(state)
state.shutdown()
@overload
def push_state(self, state_name: str, **kwargs: Any) -> State:
pass
@overload
def push_state(
self,
state_name: StateType,
**kwargs: Any,
) -> StateType:
pass
[docs] def push_state(
self,
state_name: Union[str, StateType],
**kwargs: Any,
) -> State:
"""
Pause currently running state and start new one.
Parameters:
state_name: Name of state to start.
kwargs: Arguments to pass to the ``__init__`` method of the
new state.
Returns:
Instanced state.
"""
logger.debug("push state: %s", state_name)
previous = self.current_state
if previous is not None:
self._check_resume(previous)
previous.pause()
if isinstance(state_name, State):
instance = state_name
elif isinstance(state_name, str):
instance = self._instance(state_name, **kwargs)
else:
warnings.warn(
"Calling push_state with Type[State] is deprecated, use an instantiated State instead",
DeprecationWarning,
)
instance = state_name(**kwargs) if kwargs else state_name()
instance.startup(**kwargs)
self._resume_set.add(instance)
self._state_stack.insert(0, instance)
if self._on_state_change_hook:
self._on_state_change_hook()
return instance
@overload
def replace_state(self, state_name: str, **kwargs: Any) -> State:
pass
@overload
def replace_state(
self,
state_name: StateType,
**kwargs: Any,
) -> StateType:
pass
[docs] def replace_state(
self,
state_name: Union[str, State],
**kwargs: Any,
) -> State:
"""
Replace the currently running state with a new one.
This is essentially, just a ``push_state``, followed by
``pop_state(running_state)``.
This cannot be used to replace states in the middle of the stack.
Parameters:
state_name: Name of state to start.
kwargs: Arguments to pass to the ``__init__`` method of the
new state.
Returns:
Instanced state.
"""
logger.debug("replace state: %s", state_name)
# raise error if stack is empty
if not self._state_stack:
raise RuntimeError(
"Attempted to replace state when stack was empty."
)
previous = self._state_stack[0]
instance = self.push_state(state_name, **kwargs)
self.remove_state(previous)
return instance
@property
def current_state(self) -> Optional[State]:
"""
Return the currently running state, if any.
Returns:
Currently running state.
"""
try:
return self._state_stack[0]
except IndexError:
return None
@property
def active_states(self) -> Sequence[State]:
"""
Sequence of states that are active.
Returns:
List of active states.
"""
return self._state_stack[:]
@property
def queued_states(self) -> Sequence[Tuple[str, Mapping[str, Any]]]:
"""
Sequence of states that are queued.
Returns:
List of queued states
"""
return self._state_queue[:]
@overload
def get_state_by_name(self, state_name: str) -> State:
pass
@overload
def get_state_by_name(
self,
state_name: Type[StateType],
) -> StateType:
pass
[docs] def get_state_by_name(
self,
state_name: Union[str, Type[State]],
) -> State:
"""
Query the state stack for a state by the name supplied.
Parameters:
state_name: Name of a state.
Returns:
State with that name, if one exist. ``None`` otherwise.
"""
for state in self.active_states:
if (
state.__class__.__name__ == state_name
or state.__class__ == state_name
):
return state
raise ValueError(f"Missing state {state_name}")
[docs] def get_queued_state_by_name(
self,
state_name: str,
) -> Tuple[str, Mapping[str, Any]]:
"""
Query the queued state stack for a state by the name supplied.
Parameters:
state_name: Name of a state.
Returns:
State with that name, if one exist. ``None`` otherwise.
"""
for queued_state in self._state_queue:
if queued_state[0] == state_name:
return queued_state
raise ValueError(f"Missing queued state {state_name}")