I’m working on a project in which one of our services need to publish messages to a RabbitMQ exchange. Consider this code excerpt from the message queue client:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
# file: mq_client.py import pika class MQ_Client(): connection = None channel = None exchange_name="my_exchange" def connect(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="mq_server") self.channel = connection.channel() def publish(self, message): properties = pika.BasicProperties(content_type='text/json', delivery_mode=1) if self.channel.basic_publish(exchange=self.exchange_name, routing_key='', properties=properties, body=message): log.info("Successfully published this message to exchange \"" + self.exchange_name + "\": " + message) else: log.error("Failed to publish message \"" + message + "\" to the exchange \"" + self.exchange_name + "\"!") raise Exception("Failed to publish message to the queue.") |
In my unit tests I wanted to verify that an unsuccessful attempt to publish the message, would cause trigger an exception to be thrown. To test this, I needed
basic_publish to return False. At first I found it tricky to patch instance methods such as
basic_publish , but after coming across this excellent…
Read More Read More