Source code for pynenc_rabbitmq.util.rabbitmq_queue_mng
"""Queue manager for RabbitMQ operations with automatic retry."""
import logging
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
import pika
from pynenc_rabbitmq.util.retry import RECOVERABLE_EXCEPTIONS, retry_with_backoff
if TYPE_CHECKING:
from pynenc_rabbitmq.util.rabbitmq_conn_mng import ConnectionManager
from pynenc_rabbitmq.util.rabbitmq_queue import QueueSpec
logger = logging.getLogger(__name__)
[docs]
class QueueManager:
"""
Manages queue operations with automatic retry on connection failures.
Wraps all queue operations with exponential backoff retry logic
to handle transient connection issues gracefully.
"""
def __init__(
self, connection_manager: "ConnectionManager", spec: "QueueSpec"
) -> None:
self._connection_manager = connection_manager
self._spec = spec
[docs]
def _get_retry_config(self) -> dict[str, Any]:
"""Get retry configuration from connection manager config."""
conf = self._connection_manager.conf
return {
"max_attempts": conf.rabbitmq_retry_max_attempts,
"initial_delay": conf.rabbitmq_retry_initial_delay,
"max_delay": conf.rabbitmq_retry_max_delay,
"exponential_base": conf.rabbitmq_retry_exponential_base,
}
[docs]
def _retry_operation(
self, operation: Callable[[], Any], operation_name: str
) -> Any:
"""Execute operation with configured retry logic."""
retry_config = self._get_retry_config()
return retry_with_backoff(
operation=operation,
operation_name=f"{operation_name}[{self._spec.name}]",
**retry_config,
)
[docs]
def publish_message(self, message: str) -> bool:
"""
Publish a message to the queue with retry logic.
:param message: Message content to publish
:return: True if successful, False on error
"""
def _publish() -> bool:
try:
with self._connection_manager.get_channel() as channel:
channel.queue_declare(
queue=self._spec.name, durable=self._spec.durable
)
properties = pika.BasicProperties(
delivery_mode=2 if self._spec.durable else 1
)
channel.basic_publish(
exchange=self._spec.exchange,
routing_key=self._spec.routing_key,
body=message.encode(),
properties=properties,
)
return True
except RECOVERABLE_EXCEPTIONS:
# Re-raise recoverable exceptions for retry logic
raise
except Exception as e:
logger.error(
f"Failed to publish message to queue {self._spec.name}: {e}",
exc_info=True,
)
return False
return self._retry_operation(_publish, "publish_message")
[docs]
def consume_message(self) -> str | None:
"""
Consume a single message from the queue with retry logic.
:return: Message content or None if queue is empty
"""
def _consume() -> str | None:
try:
with self._connection_manager.get_channel() as channel:
channel.queue_declare(
queue=self._spec.name, durable=self._spec.durable
)
method, _, body = channel.basic_get(
queue=self._spec.name, auto_ack=True
)
if method:
return body.decode()
return None
except RECOVERABLE_EXCEPTIONS:
raise
except Exception as e:
logger.error(
f"Failed to consume message from queue {self._spec.name}: {e}",
exc_info=True,
)
return None
return self._retry_operation(_consume, "consume_message")
[docs]
def get_message_count(self) -> int:
"""
Get the number of messages in the queue with retry logic.
:return: Number of messages in queue
"""
def _count() -> int:
with self._connection_manager.get_channel() as channel:
result = channel.queue_declare(
queue=self._spec.name, durable=self._spec.durable, passive=True
)
return result.method.message_count
return self._retry_operation(_count, "get_message_count")
[docs]
def purge_queue(self) -> int:
"""
Clear all messages from the queue with retry logic.
:return: Number of messages purged
"""
def _purge() -> int:
with self._connection_manager.get_channel() as channel:
channel.queue_declare(queue=self._spec.name, durable=self._spec.durable)
result = channel.queue_purge(queue=self._spec.name)
return result.method.message_count
return self._retry_operation(_purge, "purge_queue")