cuery.asy#

Composable async “policy” wrappers (timeout, semaphore, retries, fallback, timer, progress) for coroutine factories.

Why factories instead of raw coroutines?#

Each policy wrapper calls the provided zero‑argument callable to obtain a fresh coroutine object every time it needs to run (initial execution, each retry, etc.). This avoids the common pitfall of trying to re‑await an already awaited coroutine and enables clean layering of cross‑cutting concerns.

Acceptable inputs (“coro factories”):
  • An async def function with all arguments pre‑bound via functools.partial

  • A lambda: some_async_fn(...) returning a coroutine

  • Any Callable[[], Awaitable[Any]] fulfilling the CoroFactory protocol

NOT acceptable:
  • An already created coroutine object (e.g. some_async_fn(...)) because it

    is not callable and cannot be re‑created for retries

  • A sync function unless it returns an awaitable (would raise TypeError when awaited)

Provided wrappers#

with_timeout Apply an asyncio.wait_for timeout (per attempt) with_semaphore Limit concurrency with an asyncio.Semaphore with_retries Exponential backoff retries using tenacity (returns after first success) with_fallback Provide a fallback value or callable if the wrapped factory raises with_timer Time a single execution attempt and log the elapsed seconds with_progress Increment a tqdm progress bar and optionally call an

async progress callback

with_policies Convenience function composing any subset of the above in a predictable order

Composition order in with_policies#

The order is: timeout -> semaphore -> retries -> fallback -> timer -> progress. This means, for example, that retries encompass timeout and semaphore acquisition, and that fallback is only engaged after retries are exhausted. The timer (if enabled) measures only the final, successful execution path (after fallback if provided) and excludes progress update overhead. Progress is only updated on successful completions of the wrapped call.

Tenacity retry semantics#

with_retries uses AsyncRetrying with exponential backoff capped by wait_max. It returns the first successful result. If all attempts fail, it raises the last exception.

Partial objects “just work”#

functools.partial of an async def is itself a zero‑arg callable producing a new coroutine each invocation, so it satisfies the factory contract even if static type checkers cannot perfectly infer Callable[[], Awaitable[Any]]. This is why passing partial(fetch, url, client) to with_policies behaves correctly.

Example

import asyncio
from functools import partial
from asyncio import Semaphore
from tqdm.auto import tqdm
from cuery.asy import with_policies

async def fetch(idx: int) -> int:
        await asyncio.sleep(0.1)
        if idx % 5 == 0:
                raise RuntimeError("boom")
        return idx

async def main():
        sem = Semaphore(10)
        pbar = tqdm(total=100)

        async def progress_hook(d: dict):  # optional
                # d contains tqdm's format_dict (e.g. n, total, rate, etc.)
                pass

        factories = [partial(fetch, i) for i in range(100)]
        wrapped = [
                with_policies(
                        f,
                        timeout=1.0,
                        semaphore=sem,
                        retries=2,
                        wait_max=5,
                        fallback=lambda: -1,  # return -1 after all retries fail
                        pbar=pbar,
                        progress_callback=progress_hook,
                        min_iters=10,
                )
                for f in factories
        ]

        results = await asyncio.gather(*[w() for w in wrapped])
        pbar.close()
        return results

if __name__ == "__main__":
        asyncio.run(main())

Design goals#

  • Small, orthogonal wrappers easy to unit test

  • Deferred coroutine creation for safe retries / timeouts

  • Minimal runtime overhead when a policy is unused (simple identity composition)

  • Clear logging hooks (see retry after= callback and fallback warning)

Attributes#

CoroFactory

A callable that produces an awaitable coroutine when called.

AsyncFunc

An async function type alias for better readability.

Functions#

with_timeout(coro_factory, timeout)

Wrap a coroutine factory with a per-run timeout.

with_semaphore(coro_factory, semaphore)

Wrap a coroutine factory with concurrency control using a semaphore.

with_retries(coro_factory[, attempts, wait_max])

Wrap a coroutine factory with retry logic (tenacity).

with_fallback(coro_factory, fallback)

Wrap a coroutine factory with fallback if all attempts fail.

with_progress(coro_factory, pbar[, progress_callback, ...])

Wrap a coroutine factory to update a tqdm progress bar after completion,

with_timer(coro_factory[, label])

Wrap a coroutine factory with a simple elapsed time logger.

with_policies(coro_factory[, timeout, semaphore, ...])

Wrap a coroutine factory with multiple policies.

all_with_policies(func[, kwds, policies, labels])

Create a list of wrapped coroutines for many parameter sets.

Module Contents#

cuery.asy.CoroFactory#

A callable that produces an awaitable coroutine when called.

cuery.asy.AsyncFunc#

An async function type alias for better readability.

cuery.asy.with_timeout(coro_factory, timeout)#

Wrap a coroutine factory with a per-run timeout.

Parameters:
  • coro_factory (CoroFactory)

  • timeout (float)

Return type:

CoroFactory

cuery.asy.with_semaphore(coro_factory, semaphore)#

Wrap a coroutine factory with concurrency control using a semaphore.

Parameters:
  • coro_factory (CoroFactory)

  • semaphore (asyncio.Semaphore)

Return type:

CoroFactory

cuery.asy.with_retries(coro_factory, attempts=3, wait_max=60)#

Wrap a coroutine factory with retry logic (tenacity).

Parameters:
  • coro_factory (CoroFactory)

  • attempts (int)

  • wait_max (int)

Return type:

collections.abc.Callable[[], collections.abc.Awaitable]

cuery.asy.with_fallback(coro_factory, fallback)#

Wrap a coroutine factory with fallback if all attempts fail.

Parameters:
  • coro_factory (CoroFactory)

  • fallback (collections.abc.Callable | Any)

Return type:

CoroFactory

cuery.asy.with_progress(coro_factory, pbar, progress_callback=None, min_iters=1)#

Wrap a coroutine factory to update a tqdm progress bar after completion, and optionally call an async progress_callback every min_iters steps or on final step.

Parameters:
  • coro_factory (collections.abc.Callable[[], collections.abc.Awaitable])

  • pbar (tqdm.auto.tqdm)

  • progress_callback (collections.abc.Callable[[dict], collections.abc.Awaitable[None]] | None)

  • min_iters (int)

Return type:

collections.abc.Callable[[], collections.abc.Awaitable]

cuery.asy.with_timer(coro_factory, label=None)#

Wrap a coroutine factory with a simple elapsed time logger.

Logs the wall-clock seconds the coroutine took to resolve (successful or via fallback if used outside). If the coroutine raises an exception the timing is still logged before the exception is propagated.

Parameters:
  • coro_factory (CoroFactory) – Zero-arg coroutine factory to wrap.

  • label (str | None) – Optional label to include in the log line for identification.

Return type:

CoroFactory

cuery.asy.with_policies(coro_factory, timeout=None, semaphore=None, retries=0, wait_max=60, fallback=None, timer=False, pbar=None, progress_callback=None, min_iters=1, label=None)#

Wrap a coroutine factory with multiple policies.

Parameters:
  • coro_factory (CoroFactory)

  • timeout (float | None)

  • semaphore (asyncio.Semaphore | None)

  • retries (int)

  • wait_max (int)

  • fallback (collections.abc.Callable | Any)

  • timer (bool)

  • pbar (tqdm.auto.tqdm | None)

  • progress_callback (collections.abc.Callable[[dict], collections.abc.Awaitable[None]] | None)

  • min_iters (int)

  • label (str | None)

Return type:

CoroFactory

cuery.asy.all_with_policies(func, kwds=None, policies=None, labels=None)#

Create a list of wrapped coroutines for many parameter sets.

Parameters:
  • func (AsyncFunc) – The async function to wrap.

  • kwds (list[dict] | None) – Iterable of keyword-argument dicts.

  • policies (dict[str, Any] | None) – Same keyword arguments as with_policies.

  • labels (str | list | None)

Returns:

List of coroutines, each wrapped with the given policies, ready to be awaited or gathered

Return type:

list[collections.abc.Coroutine]