Source code for tuxemon.event.eventengine

# SPDX-License-Identifier: GPL-3.0
# Copyright (c) 2014-2023 William Edwards <shadowapex@gmail.com>, Benjamin Bean <superman2k5@gmail.com>
from __future__ import annotations

import logging
from contextlib import contextmanager
from textwrap import dedent
from typing import (
    Any,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)

from tuxemon import plugin, prepare
from tuxemon.constants import paths
from tuxemon.event import EventObject, MapAction, MapCondition
from tuxemon.event.eventaction import EventAction
from tuxemon.event.eventcondition import EventCondition
from tuxemon.map import TuxemonMap
from tuxemon.platform.const import buttons
from tuxemon.platform.events import PlayerInput
from tuxemon.session import Session

logger = logging.getLogger(__name__)


[docs]class RunningEvent: """ Manage MapEvents that are used during gameplay. Running events are considered to have all conditions satisfied. Once started, they will eventually execute all actions of the MapEvent. RunningEvents do not preserve state between calls or maps. RunningEvents have an action_index. The action_index is the index of the action list of the action currently running. The current_action attribute is the instance of the running action. Actions being managed by the RunningEvent class can share information using the context dictionary. Parameters: map_event: Event defined in the map containing the information about the actions. """ __slots__ = ( "map_event", "context", "action_index", "current_action", "current_map_action", ) def __init__(self, map_event: EventObject) -> None: self.map_event = map_event self.context: Dict[str, Any] = dict() self.action_index = 0 self.current_action: Optional[EventAction[Any]] = None self.current_map_action = None
[docs] def get_next_action(self) -> Optional[MapAction]: """ Get the next action to execute, if any. Returns MapActions, which are just data from the map, not live objects. ``None`` will be returned if the MapEvent is finished. Returns: Next action to execute. ``None`` if there isn't one. """ # if None, then make a new one try: action = self.map_event.acts[self.action_index] except IndexError: # reached end of list, remove event and move on logger.debug("map event actions finished") return None return action
[docs] def advance(self) -> None: self.action_index += 1
[docs]class EventEngine: """ A class for the event engine. The event engine checks to see if a group of conditions have been met and then executes a set of actions. Actions in the same MapEvent are not run concurrently, and they can be run over one or several frames. Currently this engine is run in the context of a single map. Any actions or conditions executed on one map will be reset when the map is changed. Parameters: session: Object containing the session information. """ def __init__(self, session: Session) -> None: self.session = session self.running_events: Dict[int, RunningEvent] = dict() self.name = "Event" self.current_map: Optional[TuxemonMap] = None self.timer = 0.0 self.wait = 0.0 self.button = None # debug self.partial_events: List[Sequence[Tuple[bool, MapCondition]]] = list() self.conditions = plugin.load_plugins( paths.CONDITIONS_PATH, "conditions", interface=EventCondition, ) # Mypy fails to typecheck here because # https://github.com/python/mypy/issues/4717 # The workarounds are ugly, so we will wait # for that to be fixed. self.actions = plugin.load_plugins( paths.ACTIONS_PATH, "actions", interface=EventAction, # type: ignore[type-abstract] )
[docs] def reset(self) -> None: """Clear out running events. Use when changing maps.""" self.running_events = dict() self.current_map = None self.timer = 0.0 self.wait = 0.0 self.button = None
[docs] def get_action( self, name: str, parameters: Optional[Sequence[Any]] = None, ) -> Optional[EventAction[Any]]: """ Get an action that is loaded into the engine. A new instance will be returned each time. Return ``None`` if action is not loaded. Parameters: name: Name of the action. parameters: List of parameters that the action accepts. Returns: New instance of the action with the appropriate parameters if that action is loaded. ``None`` otherwise. """ if parameters is None: parameters = list() try: action = self.actions[name] except KeyError: error = f'Error: EventAction "{name}" not implemented' logger.warning(error) return None try: return action(*parameters) except Exception as e: logger.warning( f"Error running {name}. Could not instantiate {action} with parameters {parameters}: {e}" ) return None
[docs] def get_actions(self) -> List[Type[EventAction]]: """ Return list of EventActions. """ return list(self.actions.values())
[docs] def get_condition(self, name: str) -> Optional[EventCondition]: """ Get a condition that is loaded into the engine. A new instance will be returned each time. Return ``None`` if condition is not loaded. Parameters: name: Name of the condition. Returns: New instance of the condition if that condition is loaded. ``None`` otherwise. """ # TODO: make generic try: condition = self.conditions[name] except KeyError: error = f'Error: EventCondition "{name}" not implemented' logger.warning(error) return None else: return condition()
[docs] def get_conditions(self) -> List[Type[EventCondition]]: """ Return list of EventConditions. """ return list(self.conditions.values())
[docs] def check_condition( self, cond_data: MapCondition, ) -> bool: """ Check if condition is true of false. Returns ``False`` if the condition is not loaded properly. Parameters: cond_data: The condition to check. Returns: The value of the condition. """ map_condition = self.get_condition(cond_data.type) if map_condition is None: logger.debug(f'map condition "{cond_data.type}" is not loaded') return False result = map_condition.test(self.session, cond_data) == ( cond_data.operator == "is" ) logger.debug( f'map condition "{map_condition.name}": {result} ({cond_data})' ) return result
[docs] def execute_action( self, action_name: str, parameters: Optional[Sequence[Any]] = None, ) -> None: """ Load and execute an action. This will cause the game to hang if an action waits on game changes. Parameters: action_name: Name of the action. parameters: Parameters of the action. """ if parameters is None: parameters = list() action = self.get_action(action_name, parameters) if action is None: error_msg = f'Map action "{action_name}" is not loaded' logger.warning(error_msg) raise ValueError(error_msg) return action.execute()
[docs] def start_event(self, map_event: EventObject) -> None: """ Begins execution of action list. Conditions are not checked. Parameters: map_event: Event whose actions will be executed. """ # the event id is used to make sure multiple copies of the same event # are not started. If not checked, then the game would freeze while # it tries to run unlimited copies of the same event, forever. if map_event.id not in self.running_events: logger.debug(f"starting map event: {map_event}") logger.debug("Executing action list") logger.debug(map_event) token = RunningEvent(map_event) assert map_event.id self.running_events[map_event.id] = token
[docs] def process_map_event(self, map_event: EventObject) -> None: """ Check the conditions of an event, and execute actions they are met. Actions will be started, but may finish much later. Parameters: map_event: Event to process. """ # debugging mode is slower and will check all conditions if prepare.CONFIG.collision_map: # less optimal, debug started = 0 conds = list() for cond in map_event.conds: # TODO: wrap with add_error_context if self.check_condition(cond): conds.append((True, cond)) started += 1 else: conds.append((False, cond)) if started == len(map_event.conds): self.start_event(map_event) self.partial_events.append(conds) else: # optimal, less debug if all(self.check_condition(cond) for cond in map_event.conds): self.start_event(map_event)
[docs] def process_map_events(self, events: Iterable[EventObject]) -> None: """ Process all events in an iterable. Simple now, may become more complex. Parameters: events: Iterable of events to process. """ for event in events: self.process_map_event(event)
[docs] def update(self, dt: float) -> None: """ Check all the MapEvents and start their actions if conditions are met. Parameters: dt: Amount of time passed in seconds since last frame. """ # debug self.partial_events = list() self.check_conditions() self.update_running_events(dt)
[docs] def check_conditions(self) -> None: """ Checks conditions. If any are satisfied, start the MapActions. Actions may be started during this function. """ # do the "init" events. this will be done just once # TODO: find solution that doesn't nuke the init list # TODO: make event engine generic, so can be used in global scope, # not just maps if self.session.client.inits: self.process_map_events(self.session.client.inits) self.session.client.inits = list() # process any other events self.process_map_events(self.session.client.events)
[docs] def update_running_events(self, dt: float) -> None: """ Update the events that are running. Parameters: dt: Amount of time passed in seconds since last frame. """ to_remove = set() current_map = self.current_map # Loop through the list of actions and update them for i, e in self.running_events.items(): # If the current map has changed, then `reset` has also been # called, which replaced self.running_events with an empty dict. # We need to stop processing the running_events, as they may not # make sense on the new map. We need to explicitly guard for this # because actions within this loop can change the map. if current_map != self.current_map: # The map has just changed, so running_events should have been # emptied. assert not self.running_events return while 1: """ * if RunningEvent is currently running an action, then continue to do so * if not, attempt to get the next queued action * if no queued action, do not check the RunningEvent next frame * if there is an action, then update it * if action is finished, then clear the pointer to the action and inc. the index, cleanup * RunningEvent will be checked next frame This loop will execute as many actions as possible for every MapEvent. For example, some actions like set_variable do not require several frames, so all of them will be processed this frame. If an action is not finished, then this loop breaks and will check another RunningEvent, but the position in the action list is remembered and will be restored. """ if e.current_action is None: next_action = e.get_next_action() if next_action is None: # no next action, so remove the running event to_remove.add(i) break else: # got an action, so start it action = self.get_action( next_action.type, next_action.parameters, ) if action is None: # action was not loaded, so, break? raise # exception, idk # TODO: raise custom exception instead of None # return? # TODO: decide what to do for actions not loaded logger.debug("action is not loaded!") to_remove.add(i) break else: # start the action # with add_error_context(e.map_event, next_action, # self.session): action.start() # save the action that is running e.current_action = action # update the action action = e.current_action # with add_error_context(e.map_event, e.current_map_action, # self.session): action.update() if action.done: # action finished, so continue and do the next one, # if available action.cleanup() e.advance() e.current_action = None logger.debug(f"action finished: {action}") else: # action didn't finish, so move on to next RunningEvent break for i in to_remove: try: del self.running_events[i] except KeyError: # map changes or engine resets may cause this error pass
[docs] def process_event(self, event: PlayerInput) -> Optional[PlayerInput]: """ Handles player input events. This function is only called when the player provides input such as pressing a key or clicking the mouse. Since this is part of a chain of event handlers, the return value from this method becomes input for the next one. Returning ``None`` signifies that this method has dealt with an event and wants it exclusively. Return the event and others can use it as well. You should return ``None`` if you have handled input here. Parameters: event: The event received. Returns: The input event, or ``None`` to prevent others to receive it. """ # has the player pressed the action key? if event.pressed and event.button == buttons.A: for map_event in self.session.client.interacts: self.process_map_event(map_event) return event
[docs]@contextmanager def add_error_context( event: EventObject, item: Union[MapCondition, MapAction], session: Session, ) -> Generator[None, None, None]: """ Add error information about the involved condition or action. This should be used as a context manager for code that may fail associated with a particular condition or action. Parameters: event: Event associated with the condition or action. item: Condition or action that produces the error. session: Object containing the session information. """ try: yield except Exception: from lxml import etree file_name = session.client.get_map_filepath() tree = etree.parse(file_name) event_node = tree.find("//object[@id='%s']" % event.id) msg = None if event_node: if item.name is None: # It's an "interact" event, so no condition defined in the map msg = """ Error in {file_name} {event} Line {line_number} """.format( file_name=file_name, event=etree.tostring(event_node) .decode() .split("\n")[0] .strip(), line_number=event_node.sourceline, ) else: # This is either a condition or an action child_node = event_node.find( ".//property[@name='%s']" % (item.name) ) if child_node: msg = """ Error in {file_name} {event} ... {line} Line {line_number} """.format( file_name=file_name, event=etree.tostring(event_node) .decode() .split("\n")[0] .strip(), line=etree.tostring(child_node).decode().strip(), line_number=child_node.sourceline, ) if msg: print(dedent(msg)) raise