Source code for pynenc_rabbitmq.util.retry

"""
Retry utilities with exponential backoff for RabbitMQ operations.

Provides resilient connection handling by retrying failed operations
with configurable exponential backoff delays.

Key components:
- calculate_backoff_delay: Computes delay for a given retry attempt
- retry_with_backoff: Decorator/wrapper for retryable operations
- RECOVERABLE_EXCEPTIONS: Tuple of exceptions that trigger retry
"""

import logging
import time
from collections.abc import Callable
from typing import TypeVar

import pika.exceptions

logger = logging.getLogger(__name__)

T = TypeVar("T")

# Exceptions that indicate a recoverable connection issue
RECOVERABLE_EXCEPTIONS = (
    pika.exceptions.AMQPConnectionError,
    pika.exceptions.AMQPChannelError,
    pika.exceptions.StreamLostError,
    pika.exceptions.ConnectionClosedByBroker,
    ConnectionError,
    ConnectionResetError,
    BrokenPipeError,
    TimeoutError,
)


[docs] def calculate_backoff_delay( attempt: int, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, ) -> float: """ Calculate exponential backoff delay for a given attempt. :param attempt: Current attempt number (0-indexed) :param initial_delay: Initial delay in seconds :param max_delay: Maximum delay cap in seconds :param exponential_base: Base for exponential calculation :return: Delay in seconds """ delay = initial_delay * (exponential_base**attempt) return min(delay, max_delay)
[docs] def retry_with_backoff( operation: Callable[[], T], operation_name: str, max_attempts: int = 0, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, on_retry: Callable[[int, Exception, float], None] | None = None, ) -> T: """ Execute an operation with exponential backoff retry on connection failures. :param operation: Callable to execute :param operation_name: Name for logging purposes :param max_attempts: Maximum retry attempts (0 = infinite) :param initial_delay: Initial delay in seconds before first retry :param max_delay: Maximum delay between retries :param exponential_base: Base for exponential backoff :param on_retry: Optional callback called before each retry with (attempt, exception, delay) :return: Result of the operation :raises: The last exception if max_attempts is reached """ attempt = 0 while True: try: return operation() except RECOVERABLE_EXCEPTIONS as e: attempt += 1 if max_attempts > 0 and attempt >= max_attempts: logger.error( f"Operation '{operation_name}' failed after {attempt} attempts: {e}" ) raise delay = calculate_backoff_delay( attempt - 1, initial_delay, max_delay, exponential_base ) logger.warning( f"RabbitMQ connection error during '{operation_name}' " f"(attempt {attempt}{'/' + str(max_attempts) if max_attempts > 0 else ''}): {e}. " f"Retrying in {delay:.1f}s..." ) if on_retry: on_retry(attempt, e, delay) time.sleep(delay)