Source code for tuxemon.clock

# SPDX-License-Identifier: GPL-3.0
# Copyright (c) 2014-2023 William Edwards <shadowapex@gmail.com>, Benjamin Bean <superman2k5@gmail.com>
import collections
import time
from heapq import heapify, heappop, heappush, heappushpop
from typing import Any, Callable, Deque, List, Optional, Union

__all__ = ("ScheduledItem", "Scheduler", "Clock")


[docs]class ScheduledItem: """A class that describes a scheduled callback. This class is never created by the user; instead, pygame creates and returns an instance of this class when scheduling a callback. If you hold on to instance of this class, do not modify any values of it. """ __slots__ = ["func", "interval", "last_ts", "next_ts"] def __init__( self, func: Any, last_ts: float, next_ts: float, interval: float ) -> None: self.func = func self.interval = interval self.last_ts = last_ts self.next_ts = next_ts def __lt__(self, other: Any) -> Any: try: return self.next_ts < other.next_ts except AttributeError: return self.next_ts < other
[docs]class Scheduler: """Class for scheduling functions.""" def __init__(self, time_function: Callable[[], float] = time.perf_counter): """Initialise a Clock, with optional custom time function. :Parameters: `time_function` : function Function to return the elapsed time of the application, in time units. """ super().__init__() self._time = time_function self._last_ts: float = -1 self._times: Deque[int] = collections.deque(maxlen=10) self._scheduled_items: List[ScheduledItem] = [] self._next_tick_items: List[ScheduledItem] = [] self.cumulative_time = 0.0 def _get_nearest_ts(self) -> Union[float, int]: """Schedule from now, unless now is sufficiently close to last_ts, in which case use last_ts. This clusters together scheduled items that probably want to be scheduled together. """ last_ts = self._last_ts ts = self._time() if ts - last_ts > 0.2: last_ts = ts return last_ts def _get_soft_next_ts(self, last_ts: float, interval: float) -> float: def taken(ts: float, e: float) -> bool: """Return True if the given time has already got an item scheduled nearby. """ for item in self._scheduled_items: if item.next_ts is None: continue elif abs(item.next_ts - ts) <= e: return True elif item.next_ts > ts + e: return False return False next_ts = last_ts + interval if not taken(next_ts, interval / 4): return next_ts dt = interval divs = 1 while True: next_ts = last_ts for i in range(divs - 1): next_ts += dt if not taken(next_ts, dt / 4.0): return next_ts dt /= 2 divs *= 2 # Avoid infinite loop in pathological case if divs > 16: return next_ts
[docs] def schedule( self, func: Any, delay: float = 0.0, repeat: bool = False, soft: bool = False, ) -> ScheduledItem: """ Schedule a function to be run sometime in the future. The function should have a prototype that includes ``dt`` as the first argument, which gives the elapsed time, in time units, since the last clock tick. def callback(dt): pass Limitations =========== There is a hard limit of 10 items that can be scheduled on next tick. This limit reduces the power and performance impact of the clock on mobile and battery operated computers. A runtime error will be raised if the maximum is reached. Unscheduling ============ If callback returns False (not None), then it will not be scheduled again. Soft Scheduling =============== This is useful for functions that need to be called regularly, but not relative to the initial start time. for example: events which need to occur regularly -- if all audio updates are scheduled at the same time (for example, mixing several tracks of a music score, or playing multiple videos back simultaneously), the resulting load on the CPU is excessive for those intervals but idle outside. Using the soft interval scheduling, the load is more evenly distributed. Parameters: func: Function to be called delay: Delay in time unit until it is called repeat: Function will be repeated every 'delay' units soft: See notes about Soft Scheduling Returns: Reference to scheduled item """ last_ts = self._get_nearest_ts() if soft: assert delay > 0.0 next_ts = self._get_soft_next_ts(last_ts, delay) last_ts = next_ts - delay next_ts = last_ts + delay interval = delay if repeat else 0.0 item = ScheduledItem(func, last_ts, next_ts, interval) if next_ts == 0.0: self._next_tick_items.append(item) if len(self._next_tick_items) > 10: raise RuntimeError else: heappush(self._scheduled_items, item) return item
[docs] def tick(self) -> float: """ Cause clock to update and call scheduled functions. This updates the clock's internal measure of time and returns the difference since the last update (or since the clock was created). Will call any scheduled functions that have elapsed. Returns: The number of time units since the last "tick", or 0 if this was the first tick. """ delta_t = self.set_time(self._time()) self._times.append(int(delta_t)) self.call_scheduled_functions(delta_t) return delta_t
[docs] def get_interval(self) -> float: """Get the average amount of time passed between each tick. Useful for calculating FPS if this clock is used with the display. Returned value is averaged from last 10 ticks. Value will be 0.0 if before 1st tick. :rtype: float :return: Average amount of time passed between each tick """ try: return sum(self._times) / len(self._times) except ZeroDivisionError: return 0.0
[docs] def set_time(self, time_stamp: float) -> float: """ Set the clock manually and do not call scheduled functions. Return the difference in time from the last time clock was updated. Parameters: time_stamp: This will become the new value of the clock. Setting the clock to negative values will have undefined results. Returns: The number of time units since the last update, or 0.0 if this was the first update. """ # self._last_ts will be -1 before first time set if self._last_ts < 0: delta_t = 0.0 self._last_ts = time_stamp else: delta_t = time_stamp - self._last_ts self.cumulative_time += delta_t self._last_ts = time_stamp return delta_t
[docs] def call_scheduled_functions(self, dt: float) -> bool: """ Call scheduled functions that elapsed on the last `update_time`. Parameters: dt: The elapsed time since the last update to pass to each scheduled function. Returns: Returns True if any functions were called, otherwise False. """ scheduled_items = self._scheduled_items now = self._last_ts result = False # handle items scheduled for next tick if self._next_tick_items: result = True for item in list(self._next_tick_items): retval = item.func(dt) # do not change the following line to "if not retval"! # some items will return None, but False is a special value if retval == False: self._next_tick_items.remove(item) # check the next scheduled item that is not called each tick # if it is scheduled in the future, then exit try: item = scheduled_items[0] if item.next_ts > now: return result except IndexError: return result # we have at least one item that is due to be called result = True get_soft_next_ts = self._get_soft_next_ts # wherever this value is true the current item will be pushed # into the heap. it essentially means that the current # scheduled item is important and needs stay scheduled. replace = False while scheduled_items: # get the next item to be called head = scheduled_items[0] # if next item is scheduled in the future then exit while # taking care of the heap if head.next_ts > now: if replace: heappush(scheduled_items, item) replace = False break # the scheduler will hold onto a reference to an item in # case it needs to be rescheduled. it is more efficient # to push and pop the heap in one call than to make two # calls. if replace: item = heappushpop(scheduled_items, item) else: item = heappop(scheduled_items) # call the function associated with the scheduled item retval = item.func(now - item.last_ts) if item.interval: # callbacks can unschedule themselves by returning false replace = not retval == False item.next_ts = item.last_ts + item.interval item.last_ts = now # the execution time of this item has already passed # so it must be rescheduled if item.next_ts <= now: if now - item.next_ts < 0.05: item.next_ts = now + item.interval else: # missed by significant amount, do a soft reschedule # to avoid lumping everything together # in this case, the next dt will not be accurate item.next_ts = get_soft_next_ts(now, item.interval) item.last_ts = item.next_ts - item.interval else: # replace is False, so this item will not be rescheduled replace = False # it is possible that the loop exited while an important item # was waiting to be pushed into the heap. if replace: heappush(scheduled_items, item) return result
[docs] def get_idle_time(self) -> Optional[float]: """ Get the time until the next item is scheduled. Returns: Time until the next scheduled event in time units, or ``None`` if there is no event scheduled. """ if self._next_tick_items: return 0.0 try: next_ts = self._scheduled_items[0].next_ts return max(next_ts - self._time(), 0.0) except IndexError: return None
[docs] def unschedule(self, func: Any) -> None: """ Remove a function from the schedule. If the function appears in the schedule more than once, all occurrences are removed. If the function was not scheduled, no error is raised. Parameters: func: The function to remove from the schedule. """ def remove(list_: List[ScheduledItem]) -> bool: resort = False remove_ = list_.remove for i in list(i for i in list_ if i.func is func): remove_(i) resort = True return resort remove(self._next_tick_items) if remove(self._scheduled_items): # this will restructure the heap heapify(self._scheduled_items)
[docs]class Clock(Scheduler): """Schedules stuff like a Scheduler, and includes time limiting functions WIP """ @staticmethod def _least_squares( gradient: int = 1, offset: int = 0, ) -> Any: """ source: pyglet.app.App Parameters: gradient: offset: """ X = 0 Y = 0 XX = 0 XY = 0 n = 0 x, y = yield gradient, offset X += x Y += y XX += x * x XY += x * y n += 1 while True: x, y = yield gradient, offset X += x Y += y XX += x * x XY += x * y n += 1 try: gradient = int((n * XY - X * Y) / (n * XX - X * X)) offset = int((Y - gradient * X) / n) except ZeroDivisionError: # Can happen in pathological case; keep current # gradient/offset for now. pass