Source code for memoize.statuses

"""
[Internal use only] Encapsulates update state management.
"""
import asyncio
import datetime
import logging
from asyncio import Future
from typing import Dict, Awaitable, Union

from memoize.entry import CacheKey, CacheEntry


[docs] class UpdateStatuses: def __init__(self, update_lock_timeout: datetime.timedelta = datetime.timedelta(minutes=5)) -> None: self.logger = logging.getLogger(__name__) self._update_lock_timeout = update_lock_timeout self._updates_in_progress: Dict[CacheKey, Future] = {}
[docs] def is_being_updated(self, key: CacheKey) -> bool: """Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop.""" return key in self._updates_in_progress
[docs] def mark_being_updated(self, key: CacheKey) -> None: """Informs that update has been started. Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost).. Calls to 'is_being_updated' will return True until 'mark_updated' will be called.""" if key in self._updates_in_progress: raise ValueError('Key {} is already being updated'.format(key)) future: Future = asyncio.Future() self._updates_in_progress[key] = future def complete_on_timeout_passed(): if key not in self._updates_in_progress: return if self._updates_in_progress[key] == future and not self._updates_in_progress[key].done(): self.logger.debug('Update task timed out - notifying clients awaiting for key %s', key) self._updates_in_progress[key].set_result(None) self._updates_in_progress.pop(key) asyncio.get_event_loop().call_later(delay=self._update_lock_timeout.total_seconds(), callback=complete_on_timeout_passed)
[docs] def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None: """Informs that update has been finished. Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.""" if key not in self._updates_in_progress: raise ValueError('Key {} is not being updated'.format(key)) update = self._updates_in_progress.pop(key) update.set_result(entry)
[docs] def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None: """Informs that update failed to complete. Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called. Accepts exception to propagate it across all clients awaiting an update.""" if key not in self._updates_in_progress: raise ValueError('Key {} is not being updated'.format(key)) update = self._updates_in_progress.pop(key) update.set_result(exception)
[docs] def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]: """Waits (asynchronously) until update in progress has benn finished. Returns awaitable with the updated entry (or awaitable with an exception if update failed/timed-out). Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost).""" if not self.is_being_updated(key): raise ValueError('Key {} is not being updated'.format(key)) return self._updates_in_progress[key]