Source code for pynenc_rabbitmq.broker.rabbitmq_broker

from functools import cached_property
from typing import TYPE_CHECKING

from pynenc.broker.base_broker import BaseBroker
from pynenc.identifiers.invocation_id import InvocationId

from pynenc_rabbitmq.conf.config_broker import ConfigBrokerRabbitMq
from pynenc_rabbitmq.util.rabbitmq_client import PynencRabbitMqClient

if TYPE_CHECKING:
    from pynenc.app import Pynenc

    from pynenc_rabbitmq.util.rabbitmq_queue_mng import QueueManager


[docs] class RabbitMqBroker(BaseBroker): """ A RabbitMq-based implementation of the broker for cross-process coordination. Uses RabbitMq queues for cross-process message coordination and implements all required abstract methods from BaseBroker. Connection retry with exponential backoff is handled automatically at the queue manager level. """ def __init__(self, app: "Pynenc") -> None: super().__init__(app) self._client = PynencRabbitMqClient.get_instance(self.conf) @cached_property def conf(self) -> ConfigBrokerRabbitMq: return ConfigBrokerRabbitMq( config_values=self.app.config_values, config_filepath=self.app.config_filepath, ) @cached_property def _message_queue(self) -> "QueueManager": """Get the message queue manager.""" queue_name = f"{self.conf.rabbitmq_queue_prefix}_broker_messages" return self._client.get_queue(queue_name)
[docs] def send_message(self, invocation_id: InvocationId) -> None: """Send a message (invocation) to the queue.""" self._message_queue.publish_message(invocation_id)
[docs] def route_invocation(self, invocation_id: InvocationId) -> None: """Route a single invocation by sending it to the message queue.""" self.send_message(invocation_id)
[docs] def route_invocations(self, invocation_ids: list[InvocationId]) -> None: """Route multiple invocations by sending them to the message queue.""" if not invocation_ids: return self.app.logger.info( f"Routing {len(invocation_ids)} invocations: {invocation_ids}" ) for invocation_id in invocation_ids: self.send_message(invocation_id)
[docs] def retrieve_invocation(self) -> InvocationId | None: """ Retrieve a single invocation from the queue. :return: The next DistributedInvocation in the queue, or None if empty. """ if msg := self._message_queue.consume_message(): return InvocationId(msg) return None
[docs] def count_invocations(self) -> int: """Count the number of invocations in the queue.""" return self._message_queue.get_message_count()
[docs] def purge(self) -> None: """Clear all messages from the queue.""" self._message_queue.purge_queue()