cuery.asy#
Composable async “policy” wrappers (timeout, semaphore, retries, fallback, timer, progress) for coroutine factories.
Why factories instead of raw coroutines?#
Each policy wrapper calls the provided zero‑argument callable to obtain a fresh coroutine object every time it needs to run (initial execution, each retry, etc.). This avoids the common pitfall of trying to re‑await an already awaited coroutine and enables clean layering of cross‑cutting concerns.
- Acceptable inputs (“coro factories”):
An
async deffunction with all arguments pre‑bound viafunctools.partialA
lambda: some_async_fn(...)returning a coroutineAny
Callable[[], Awaitable[Any]]fulfilling theCoroFactoryprotocol
- NOT acceptable:
- An already created coroutine object (e.g.
some_async_fn(...)) because it is not callable and cannot be re‑created for retries
- An already created coroutine object (e.g.
A sync function unless it returns an awaitable (would raise
TypeErrorwhen awaited)
Provided wrappers#
with_timeout Apply an asyncio.wait_for timeout (per attempt)
with_semaphore Limit concurrency with an asyncio.Semaphore
with_retries Exponential backoff retries using tenacity (returns after first success)
with_fallback Provide a fallback value or callable if the wrapped factory raises
with_timer Time a single execution attempt and log the elapsed seconds
with_progress Increment a tqdm progress bar and optionally call an
async progress callback
with_policies Convenience function composing any subset of the above in a predictable order
Composition order in with_policies#
The order is: timeout -> semaphore -> retries -> fallback -> timer -> progress. This means, for example, that retries encompass timeout and semaphore acquisition, and that fallback is only engaged after retries are exhausted. The timer (if enabled) measures only the final, successful execution path (after fallback if provided) and excludes progress update overhead. Progress is only updated on successful completions of the wrapped call.
Tenacity retry semantics#
with_retries uses AsyncRetrying with exponential backoff capped by wait_max.
It returns the first successful result. If all attempts fail, it raises the last exception.
Partial objects “just work”#
functools.partial of an async def is itself a zero‑arg callable producing a new
coroutine each invocation, so it satisfies the factory contract even if static type
checkers cannot perfectly infer Callable[[], Awaitable[Any]]. This is why passing
partial(fetch, url, client) to with_policies behaves correctly.
Example
import asyncio
from functools import partial
from asyncio import Semaphore
from tqdm.auto import tqdm
from cuery.asy import with_policies
async def fetch(idx: int) -> int:
await asyncio.sleep(0.1)
if idx % 5 == 0:
raise RuntimeError("boom")
return idx
async def main():
sem = Semaphore(10)
pbar = tqdm(total=100)
async def progress_hook(d: dict): # optional
# d contains tqdm's format_dict (e.g. n, total, rate, etc.)
pass
factories = [partial(fetch, i) for i in range(100)]
wrapped = [
with_policies(
f,
timeout=1.0,
semaphore=sem,
retries=2,
wait_max=5,
fallback=lambda: -1, # return -1 after all retries fail
pbar=pbar,
progress_callback=progress_hook,
min_iters=10,
)
for f in factories
]
results = await asyncio.gather(*[w() for w in wrapped])
pbar.close()
return results
if __name__ == "__main__":
asyncio.run(main())
Design goals#
Small, orthogonal wrappers easy to unit test
Deferred coroutine creation for safe retries / timeouts
Minimal runtime overhead when a policy is unused (simple identity composition)
Clear logging hooks (see retry
after=callback and fallback warning)
Attributes#
A callable that produces an awaitable coroutine when called. |
|
An async function type alias for better readability. |
Functions#
|
Wrap a coroutine factory with a per-run timeout. |
|
Wrap a coroutine factory with concurrency control using a semaphore. |
|
Wrap a coroutine factory with retry logic (tenacity). |
|
Wrap a coroutine factory with fallback if all attempts fail. |
|
Wrap a coroutine factory to update a tqdm progress bar after completion, |
|
Wrap a coroutine factory with a simple elapsed time logger. |
|
Wrap a coroutine factory with multiple policies. |
|
Create a list of wrapped coroutines for many parameter sets. |
Module Contents#
- cuery.asy.CoroFactory#
A callable that produces an awaitable coroutine when called.
- cuery.asy.AsyncFunc#
An async function type alias for better readability.
- cuery.asy.with_timeout(coro_factory, timeout)#
Wrap a coroutine factory with a per-run timeout.
- Parameters:
coro_factory (CoroFactory)
timeout (float)
- Return type:
CoroFactory
- cuery.asy.with_semaphore(coro_factory, semaphore)#
Wrap a coroutine factory with concurrency control using a semaphore.
- Parameters:
coro_factory (CoroFactory)
semaphore (asyncio.Semaphore)
- Return type:
CoroFactory
- cuery.asy.with_retries(coro_factory, attempts=3, wait_max=60)#
Wrap a coroutine factory with retry logic (tenacity).
- Parameters:
coro_factory (CoroFactory)
attempts (int)
wait_max (int)
- Return type:
collections.abc.Callable[[], collections.abc.Awaitable]
- cuery.asy.with_fallback(coro_factory, fallback)#
Wrap a coroutine factory with fallback if all attempts fail.
- Parameters:
coro_factory (CoroFactory)
fallback (collections.abc.Callable | Any)
- Return type:
CoroFactory
- cuery.asy.with_progress(coro_factory, pbar, progress_callback=None, min_iters=1)#
Wrap a coroutine factory to update a tqdm progress bar after completion, and optionally call an async progress_callback every min_iters steps or on final step.
- Parameters:
coro_factory (collections.abc.Callable[[], collections.abc.Awaitable])
pbar (tqdm.auto.tqdm)
progress_callback (collections.abc.Callable[[dict], collections.abc.Awaitable[None]] | None)
min_iters (int)
- Return type:
collections.abc.Callable[[], collections.abc.Awaitable]
- cuery.asy.with_timer(coro_factory, label=None)#
Wrap a coroutine factory with a simple elapsed time logger.
Logs the wall-clock seconds the coroutine took to resolve (successful or via fallback if used outside). If the coroutine raises an exception the timing is still logged before the exception is propagated.
- Parameters:
coro_factory (CoroFactory) – Zero-arg coroutine factory to wrap.
label (str | None) – Optional label to include in the log line for identification.
- Return type:
CoroFactory
- cuery.asy.with_policies(coro_factory, timeout=None, semaphore=None, retries=0, wait_max=60, fallback=None, timer=False, pbar=None, progress_callback=None, min_iters=1, label=None)#
Wrap a coroutine factory with multiple policies.
- Parameters:
coro_factory (CoroFactory)
timeout (float | None)
semaphore (asyncio.Semaphore | None)
retries (int)
wait_max (int)
fallback (collections.abc.Callable | Any)
timer (bool)
pbar (tqdm.auto.tqdm | None)
progress_callback (collections.abc.Callable[[dict], collections.abc.Awaitable[None]] | None)
min_iters (int)
label (str | None)
- Return type:
CoroFactory
- cuery.asy.all_with_policies(func, kwds=None, policies=None, labels=None)#
Create a list of wrapped coroutines for many parameter sets.
- Parameters:
func (AsyncFunc) – The async function to wrap.
kwds (list[dict] | None) – Iterable of keyword-argument dicts.
policies (dict[str, Any] | None) – Same keyword arguments as with_policies.
labels (str | list | None)
- Returns:
List of coroutines, each wrapped with the given policies, ready to be awaited or gathered
- Return type:
list[collections.abc.Coroutine]