Source code for pynenc_rabbitmq.util.rabbitmq_conn_mng

"""RabbitMQ connection management with thread safety."""

import logging
import threading
from contextlib import contextmanager
from typing import TYPE_CHECKING

import pika

if TYPE_CHECKING:
    from collections.abc import Generator

    from pynenc_rabbitmq.conf.config_rabbitmq import ConfigRabbitMq

logger = logging.getLogger(__name__)


[docs] class ConnectionManager: """Manages RabbitMQ connections with thread safety using thread-local storage.""" def __init__(self, conf: "ConfigRabbitMq") -> None: self.conf = conf self._local = threading.local() self._lock = threading.RLock()
[docs] def _get_connection_parameters(self) -> pika.ConnectionParameters: """ Create connection parameters from configuration. :return: Configured connection parameters for RabbitMQ """ credentials = pika.PlainCredentials( self.conf.rabbitmq_username, self.conf.rabbitmq_password ) return pika.ConnectionParameters( host=self.conf.rabbitmq_host, port=self.conf.rabbitmq_port, virtual_host=self.conf.rabbitmq_virtual_host, credentials=credentials, heartbeat=self.conf.rabbitmq_heartbeat, connection_attempts=self.conf.rabbitmq_connection_attempts, retry_delay=self.conf.rabbitmq_retry_delay, )
[docs] def _ensure_connection(self) -> pika.BlockingConnection: """ Ensure we have a valid thread-local connection. Creates one connection per thread since BlockingConnection is not thread-safe. :return: Active BlockingConnection instance for current thread """ # Check if current thread has a connection if ( not hasattr(self._local, "connection") or self._local.connection is None or self._local.connection.is_closed ): with self._lock: # Double-check after acquiring lock if ( not hasattr(self._local, "connection") or self._local.connection is None or self._local.connection.is_closed ): logger.info( "Creating new RabbitMQ connection for thread %s to %s:%s/%s", threading.current_thread().name, self.conf.rabbitmq_host, self.conf.rabbitmq_port, self.conf.rabbitmq_virtual_host, ) params = self._get_connection_parameters() self._local.connection = pika.BlockingConnection(params) logger.info( "RabbitMQ connection established for thread %s", threading.current_thread().name, ) return self._local.connection
[docs] @contextmanager def get_channel( self, ) -> "Generator[pika.adapters.blocking_connection.BlockingChannel, None, None]": """ Get a channel within a context manager for safe cleanup. Each thread gets its own connection, and channels are created per operation. :return: Generator yielding an open channel """ connection = self._ensure_connection() channel = connection.channel() if self.conf.rabbitmq_confirm_delivery: channel.confirm_delivery() channel.basic_qos(prefetch_count=self.conf.rabbitmq_prefetch_count) try: yield channel finally: try: if channel.is_open: channel.close() except Exception as e: logger.debug("Error closing channel (may already be closed): %s", e)
[docs] def close(self) -> None: """Close the thread-local connection if open.""" if hasattr(self._local, "connection") and self._local.connection is not None: try: if not self._local.connection.is_closed: self._local.connection.close() logger.info( "RabbitMQ connection closed for thread %s", threading.current_thread().name, ) except Exception as e: logger.debug("Error closing connection (may already be closed): %s", e) finally: self._local.connection = None