cuery.asy ========= .. py:module:: cuery.asy .. autoapi-nested-parse:: cuery.asy ================= Composable async "policy" wrappers (timeout, semaphore, retries, fallback, timer, progress) for coroutine *factories*. Why factories instead of raw coroutines? --------------------------------------- Each policy wrapper calls the provided zero‑argument callable to obtain a *fresh* coroutine object every time it needs to run (initial execution, each retry, etc.). This avoids the common pitfall of trying to re‑await an already awaited coroutine and enables clean layering of cross‑cutting concerns. Acceptable inputs ("coro factories"): * An ``async def`` function with all arguments pre‑bound via ``functools.partial`` * A ``lambda: some_async_fn(...)`` returning a coroutine * Any ``Callable[[], Awaitable[Any]]`` fulfilling the ``CoroFactory`` protocol NOT acceptable: * An already *created* coroutine object (e.g. ``some_async_fn(...)``) because it is not callable and cannot be re‑created for retries * A sync function unless it returns an awaitable (would raise ``TypeError`` when awaited) Provided wrappers ----------------- ``with_timeout`` Apply an ``asyncio.wait_for`` timeout (per attempt) ``with_semaphore`` Limit concurrency with an ``asyncio.Semaphore`` ``with_retries`` Exponential backoff retries using ``tenacity`` (returns after first success) ``with_fallback`` Provide a fallback value or callable if the wrapped factory raises ``with_timer`` Time a single execution attempt and log the elapsed seconds ``with_progress`` Increment a ``tqdm`` progress bar and optionally call an async progress callback ``with_policies`` Convenience function composing any subset of the above in a predictable order Composition order in ``with_policies`` ------------------------------------- The order is: timeout -> semaphore -> retries -> fallback -> timer -> progress. This means, for example, that retries encompass timeout and semaphore acquisition, and that fallback is only engaged after retries are exhausted. The timer (if enabled) measures only the final, successful execution path (after fallback if provided) and excludes progress update overhead. Progress is only updated on *successful* completions of the wrapped call. Tenacity retry semantics ------------------------ ``with_retries`` uses ``AsyncRetrying`` with exponential backoff capped by ``wait_max``. It returns the first successful result. If all attempts fail, it raises the last exception. Partial objects "just work" --------------------------- ``functools.partial`` of an ``async def`` is itself a zero‑arg callable producing a new coroutine each invocation, so it satisfies the factory contract even if static type checkers cannot perfectly infer ``Callable[[], Awaitable[Any]]``. This is why passing ``partial(fetch, url, client)`` to ``with_policies`` behaves correctly. .. rubric:: Example .. code-block:: python import asyncio from functools import partial from asyncio import Semaphore from tqdm.auto import tqdm from cuery.asy import with_policies async def fetch(idx: int) -> int: await asyncio.sleep(0.1) if idx % 5 == 0: raise RuntimeError("boom") return idx async def main(): sem = Semaphore(10) pbar = tqdm(total=100) async def progress_hook(d: dict): # optional # d contains tqdm's format_dict (e.g. n, total, rate, etc.) pass factories = [partial(fetch, i) for i in range(100)] wrapped = [ with_policies( f, timeout=1.0, semaphore=sem, retries=2, wait_max=5, fallback=lambda: -1, # return -1 after all retries fail pbar=pbar, progress_callback=progress_hook, min_iters=10, ) for f in factories ] results = await asyncio.gather(*[w() for w in wrapped]) pbar.close() return results if __name__ == "__main__": asyncio.run(main()) Design goals ------------ * Small, orthogonal wrappers easy to unit test * Deferred coroutine creation for safe retries / timeouts * Minimal runtime overhead when a policy is unused (simple identity composition) * Clear logging hooks (see retry ``after=`` callback and fallback warning) Attributes ---------- .. autoapisummary:: cuery.asy.CoroFactory cuery.asy.AsyncFunc Functions --------- .. autoapisummary:: cuery.asy.with_timeout cuery.asy.with_semaphore cuery.asy.with_retries cuery.asy.with_fallback cuery.asy.with_progress cuery.asy.with_timer cuery.asy.with_policies cuery.asy.all_with_policies Module Contents --------------- .. py:data:: CoroFactory A callable that produces an awaitable coroutine when called. .. py:data:: AsyncFunc An async function type alias for better readability. .. py:function:: with_timeout(coro_factory, timeout) Wrap a coroutine factory with a per-run timeout. .. py:function:: with_semaphore(coro_factory, semaphore) Wrap a coroutine factory with concurrency control using a semaphore. .. py:function:: with_retries(coro_factory, attempts = 3, wait_max = 60) Wrap a coroutine factory with retry logic (tenacity). .. py:function:: with_fallback(coro_factory, fallback) Wrap a coroutine factory with fallback if all attempts fail. .. py:function:: with_progress(coro_factory, pbar, progress_callback = None, min_iters = 1) Wrap a coroutine factory to update a tqdm progress bar after completion, and optionally call an async progress_callback every `min_iters` steps or on final step. .. py:function:: with_timer(coro_factory, label = None) Wrap a coroutine factory with a simple elapsed time logger. Logs the wall-clock seconds the coroutine took to resolve (successful or via fallback if used outside). If the coroutine raises an exception the timing is still logged before the exception is propagated. :param coro_factory: Zero-arg coroutine factory to wrap. :param label: Optional label to include in the log line for identification. .. py:function:: with_policies(coro_factory, timeout = None, semaphore = None, retries = 0, wait_max = 60, fallback = None, timer = False, pbar = None, progress_callback = None, min_iters = 1, label = None) Wrap a coroutine factory with multiple policies. .. py:function:: all_with_policies(func, kwds = None, policies = None, labels = None) Create a list of wrapped coroutines for many parameter sets. :param func: The async function to wrap. :param kwds: Iterable of keyword-argument dicts. :param policies: Same keyword arguments as `with_policies`. :returns: List of coroutines, each wrapped with the given policies, ready to be awaited or gathered