что такое exchange rabbitmq

RabbitMQ tutorial 3 — Публикация/Подписка

Хочу продолжить серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.

Публикация/Подписка

В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что каждое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Этот паттерн известен как «publish/subscribe» (публикация/подписка).
Чтобы понять этот шаблон, создадим простую систему логирования. Она будет состоять из двух программ – первая будет создавать логи, вторая считывать и печатать их.

В нашей систему логирования каждая программа подписчик будет получать каждое сообщение. Благодаря этому, мы сможем запустить одного подписчика на сохранение логов на диск, а потом в любое время сможем создать другого подписчика для отображения логов на экран.

По существу, каждое сообщение будет транслироваться каждому подписчику.

Точки обмена(exchanges)

В предыдущих статьях для отправки и принятия сообщений мы работали с очередью. Теперь рассмотрим расширенную модель отправки сообщений Rabbit.

Основная идея в модели отправки сообщений Rabbit – Поставщик(producer) никогда не отправляет сообщения напрямую в очередь. Фактически, довольно часто поставщик не знает, дошло ли его сообщение до конкретной очереди.
Вместо этого поставщик отправляет сообщение в точку доступа. В точке доступа нет ничего сложного. Точка доступа выполняет две функции:

— получает сообщения от поставщика;
— отправляет эти сообщения в очередь.

Точка доступа точно знает, что делать с поступившими сообщениями. Отправить сообщение в конкретную очередь, либо в несколько очередей, либо не отправлять никому и удалить его. Эти правила описываются в типе точки доступа (exchange type).

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Существуют несколько типов: direct, topic, headers и fanout. Мы остановимся на последнем типе fanout. Создадим точку с доступа с этим типом и назовем её – logs:

Тип fanout – очень прост. Он копирует все сообщения которые поступают к нему во все очереди, которые ему доступны. Это то что нам нужно для нашей системы логирования.

Просмотр списка точек доступа:

Чтобы посмотреть все точки доступа на сервере, необходимо выполнить команду rabbitmqctl:

Мы видим список точек доступа с наименованием amq.* и точку доступа без имени, которая используется по умолчанию (она не подходит для выполнения нашей задачи).

Наименование точек доступа.

В предыдущих статьях мы ничего не знали о точках доступа, но всё-таки могли отправлять письма в очередь. Это было возможно, потому что использовали точку доступа по умолчанию, которая идентифицируется пустой строкой “”.
Вспомним как раньше мы отправляли письма:

Здесь используется точка доступа по умолчанию или безымянная точка доступа: сообщение направляется в очередь, идентифицированную через ключ “routing_key”. Ключ “routing_key” передается через третий параметр функции basic_publish.

Теперь мы можем отправить сообщение в нашу именованную точку доступа:

Временные очереди:

Всё это время мы использовали наименование очередей (“hello“ или “task_queue”). Возможность давать наименования помогает указать обработчикам (workers) определенную очередь, а также делить очередь между продюсерами и подписчиками.

Но наша система логирования требует, чтобы в очередь поступали все сообщения, а не только часть. Также мы хотим, чтобы сообщения были актуальными, а не старыми. Для этого нам понадобиться 2 вещи:

— Каждый раз когда мы соединяемся с Rabbit, мы создаем новую очередь, или даем создать серверу случайное наименование;
— Каждый раз когда подписчик отключается от Rabbit, мы удаляем очередь.

В php-amqplib клиенте, когда мы обращаемся к очереди без наименовании, мы создаем временную очередь и автоматически сгенерированным наименованием:

Метод вернет автоматически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
Когда заявленное соединение оборвется, очередь автоматически удалиться.

Переплеты(Bindings)

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Итак, у нас есть точка доступа с типом fanout и очередь. Сейчас нам нужно сказать точке доступа, чтобы она отправила сообщение в очередь. Отношение между точкой доступа и очередью называется bindings.

С этого момента, сообщения для нашей очереди проходят через точку доступа.
Посмотреть список binding-ов можно используя команду rabbitmqctl list_bindings.

Отправка во все очереди:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Единственное важное отличие – теперь мы направляем сообщения в нашу именованную точку доступа ‘logs’, вместо точки доступа по умолчанию. Нам нужно было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости.

Рассмотрим код скрипта emit_log.php:

Как вы видите, после установки соединения мы создаем точку доступа. Этот шаг необходим, так как использование несуществующей точки доступа – запрещено.

Сообщение в точке доступа будут потеряны, так как ни одна очередь не связана с точкой доступа. Но это хорошо для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут безопасно удалятся.

Код подписчика receive_logs.php:

Если вы хотите сохранить логи в файл, вам потребуется открыть консоль и набрать:

$ php receive_logs.php > logs_from_rabbit.log
Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:

$ php receive_logs.php
Ну и, конечно, запуск продюсера сообщений:

С помощью команды rabbitmqctl list_bindings мы можем удостовериться, что код правильно создал очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:

В следующей статье будет описано, как прослушать только часть сообщений.

Источник

RabbitMQ. Часть 2. Разбираемся с Exchanges

Exchange — обменник или точка обмена. В него отправляются сообщения. Exchange распределяет сообщение в одну или несколько очередей. Он маршрутизирует сообщения в очередь на основе созданных связей ( bindings ) между ним и очередью.

Exchange не является Erlang-процессом. Из соображений масштабируемости exchange — это строка (ссылка на модуль с кодом, где лежит логика маршрутизации) во встроенной базе данных mnesia. 1 тысяч обменников будут потреблять всего 1МБ памяти.

Оглавление

Direct Exchange

Direct exchange — используется, когда нужно доставить сообщение в определенные очереди. Сообщение публикуется в обменник с определенным ключом маршрутизации и попадает во все очереди, которые связаны с этим обменником аналогичным ключом маршрутизации. Ключ маршрутизации — это строка. Поиск соответствия происходит при помощи проверки строк на эквивалентность.

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

В rabbitmq существует понятие обменник по умолчанию. Это direct exchange без имени. Если применяется обменник по умолчанию, то сообщение будет маршрутизироваться в очередь с именем равным ключу маршрутизации сообщения.

Topic Exchange

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Поиск соответствия шаблону осуществляется, начиная с корня и следуя сверху вниз.

Fanout Exchange

Fanout exchange – все сообщения доставляются во все очереди даже если в сообщении задан ключ маршрутизации.

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Headers Exchange

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Consistent-Hashing Exchange

Эквивалентный вес очередей – говорит о том, что в каждую очередь придет примерно одинаковое количество сообщений (каждое сообщение будет помещено только в одну очередь). Полной гарантии равномерного распределения сообщений нет.

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Должен помогать, когда пропускная способность потребителя нуждается в росте более высоком чем решение с несколькими потребителями, использующими одну очередь.

Комбинирование обменников (E2E)

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

За счет E2E мы можем найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям.

Создание Exchange

Пример создания exchange при помощи RabbitMQ.Client:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Обменник должен быть создан перед публикацией сообщений. Если вы опубликуете сообщение в какой-то не существующий обменник — RabbitMQ тихо удалит его.

Создание Exchange через графический интерфейс

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Заключение

При разработке системы удобно описывать топологию маршрутизации при помощи графа. Но прежде чем начать строить граф стоит выделить пути с большим трафиком, т.к. именно они требуют более высокую пропускную способность (производительность). Далее можно классифицировать трафик. И уже потом приступить к построению.

Комбинации различных exchange должна помочь найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям системы.

Количество exchange и очередей должно быть минимально по сравнению с количеством маршрутов.

В следующей статье начнем разбираться подробнее с Queues и Bindings.

В данном разделе опишем обменник кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.

Источник

RabbitMQ. Часть 1. Introduction. Erlang, AMQP

Добрый день, Хабр! Хочу поделиться учебником-справочником знаний, которые мне удалось собрать по RabbitMQ и сжать в короткие рекомендации и выводы.

Оглавление

Кратко про AMQP

AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.

Протокол AMQP вводит три понятия:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Протокол работает поверх TCP/IP.

Кратко про Erlang

Исходный код проекта находится в репозитории на GitHub. Архитектура RabbitMQ-server основана на Erlang и BEAM.

Кратко про RabbitMQ

Основная идея модели обмена сообщениями в RabbitMQ заключается в том, что producer (издатель) не отправляет сообщения непосредственно в очередь. На самом деле и довольно часто издатель даже не знает, будет ли сообщение вообще доставлено в какую-либо очередь.

Вместо этого издатель может отправлять сообщения только на обмен. С одной стороны, обмен получает сообщения от издателей, а с другой — отправляет их в очереди. Обмен должен точно знать, что делать с полученным сообщением. Должно ли оно быть добавлено в определенную очередь? Должно ли оно быть добавлено в несколько очередей? Или сообщение нужно игнорировать.

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Кратко работу RabbitMQ можно описать следующим образом:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Подключение и каналы

Для такого обмена информацией между клиентом и сервером используются каналы. Каналы создаются в рамках определенного подключения. Каждый канал изолирован от других каналов. В синхронном случае не возможно выполнять следующую команду, пока не получен ответ.

Для того чтобы иметь возможность отправлять команды параллельно приходится открывать несколько каналов. Каждый канал создает отдельный Erlang процесс. Одно подключение может иметь множество каналов (multiplexing). Для каждого канала существуют некие структуры и объекты в памяти. Поэтому чем больше каналов имеется в рамках соединения, тем больше памяти использует RabbitMQ для управления таким соединением.

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Простой пример создания подключения и канала при помощи RabbitMQ.Client:

Открывать новое соединение для каждой операции, настоятельно не рекомендуется, поскольку это приведет к большим затратам. Каналы также должны быть постоянными, но многие ошибки протокола приводят к закрытию канала, поэтому срок службы канала может быть короче, чем у соединения.

Где используется RabbitMQ?

В контексте микросервисов протокол AMQP и его реализацию в RabbitMQ часто используют для асинхронного взаимодействия между сервисами.

В контексте IIOT протокол AMQP и его реализацию в RabbitMQ используют для обмена данными между серверами (сервер-сервер). Также используют плагин MQTT Plugin RabbitMQ являющегося реализацией протокола MQTT для передачи данных между датчиком и сервером в низкоскоростных средах с высокой задержкой (полный перечень поддерживаемых протоколов перечислен на сайте проекта).

В следующей статье начнем разбираться подробнее с Exchanges.

Источник

RabbitMQ. Часть 2. Разбираемся с Exchanges

Категории

Свежие записи

Наши услуги

Exchange — обменник или точка обмена. В него отправляются сообщения. Exchange распределяет сообщение в одну или несколько очередей. Он маршрутизирует сообщения в очередь на основе созданных связей ( bindings ) между ним и очередью.

Оглавление

Direct Exchange

Direct exchange — используется, когда нужно доставить сообщение в определенные очереди. Сообщение публикуется в обменник с определенным ключом маршрутизации и попадает во все очереди, которые связаны с этим обменником аналогичным ключом маршрутизации. Ключ маршрутизации — это строка. Поиск соответствия происходит при помощи проверки строк на эквивалентность.

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

В rabbitmq существует понятие обменник по умолчанию. Это direct exchange без имени. Если применяется обменник по умолчанию, то сообщение будет маршрутизироваться в очередь с именем равным ключу маршрутизации сообщения.

Topic Exchange

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Поиск соответствия шаблону осуществляется, начиная с корня и следуя сверху вниз.

Fanout Exchange

Fanout exchange – все сообщения доставляются во все очереди даже если в сообщении задан ключ маршрутизации.

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Headers Exchange

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Consistent-Hashing Exchange

Эквивалентный вес очередей – говорит о том, что в каждую очередь придет примерно одинаковое количество сообщений (каждое сообщение будет помещено только в одну очередь). Полной гарантии равномерного распределения сообщений нет.

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Должен помогать, когда пропускная способность потребителя нуждается в росте более высоком чем решение с несколькими потребителями, использующими одну очередь.

Комбинирование обменников (E2E)

Графическое представление потока сообщений:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

За счет E2E мы можем найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям.

Создание Exchange

Пример создания exchange при помощи RabbitMQ.Client :

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Обменник должен быть создан перед публикацией сообщений. Если вы опубликуете сообщение в какой-то не существующий обменник — RabbitMQ тихо удалит его.

Создание Exchange через графический интерфейс

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Заключение

При разработке системы удобно описывать топологию маршрутизации при помощи графа. Но прежде чем начать строить граф стоит выделить пути с большим трафиком, т.к. именно они требуют более высокую пропускную способность (производительность). Далее можно классифицировать трафик. И уже потом приступить к построению.

Комбинации различных exchange должна помочь найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям системы.

Количество exchange и очередей должно быть минимально по сравнению с количеством маршрутов.

В следующей статье начнем разбираться подробнее с Queues и Bindings.

Источник

RabbitMQ tutorial 1 — Hello World

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

RabbitMQ позволяет взаимодействовать различным программам при помощи протокола AMQP. RabbitMQ является отличным решением для построения SOA (сервис-ориентированной архитектуры) и распределением отложенных ресурсоемких задач.

Под катом перевод первого из шести уроков официального сайта. Примеры на python, но его знание вовсе не обязательно. Аналогичные примеру программы можно воспроизвести практически на любом популярном ЯП. [так выглядят комментарии переводчика, т.е. меня]

Вступление

RabbitMQ ‒ это брокер сообщений. Его основная цель ‒ принимать и отдавать сообщения. Его можно представлять себе, как почтовое отделение: когда Вы бросаете письмо в ящик, Вы можете быть уверены, что рано или поздно почтальон доставит его адресату [видимо, автор ни разу не имел дела с Почтой России]. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.

Наибольшее отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами ‒ RabbitMQ принимает, хранит и отдает бинарные данные ‒ сообщения.

В RabbitMQ, а также обмене сообщениями в целом, используется следующая терминология:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Поставщик, подписчик и брокер не обязаны находиться на одной физической машине, обычно они находятся на разных.

Hello World!

Первый пример не будет особо сложным ‒ давайте просто отправим сообщение, примем его и выведем на экран. Для этого нам потребуется две программы: одна будет отправлять сообщения, другая ‒ принимать и выводить их на экран.
Общая схема такова:

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Поставщик отправляет сообщения в очередь с именем «hello», а подписчик получает сообщения из этой очереди.

Библиотека RabbitMQ

RabbitMQ использует протокол AMQP. Для использования RabbitMQ необходима библиотека, поддерживающая этот протокол. Такие библиотеки можно найти практически для каждого языка программирования. Python ‒ не исключение, для него есть несколько библиотек:

Отправка сообщений

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Наша первая программа send.py будет просто отправлять одно сообщение в очередь.

Мы подключились к брокеру сообщений, находящемуся на локальном хосте. Для подключения к брокеру, находящемуся на другой машине, достаточно заменить «localhost» на IP адрес этой машины.

Перед отправкой сообщения мы должны убедиться, что очередь, получающая сообщение, существует. Если отправить сообщение в несуществующую очередь, RabbitMQ его проигнорирует. Давайте создадим очередь, в которую будет отправлено сообщение, назовем ее «hello»:

Теперь все готово для отправки сообщения. Наше первое сообщение будет содержать строку и будет отправлено в очередь с именем «hello».

Вообще, в RabbitMQ сообщения не отправляются непосредственно в очередь, они должны пройти через exchange (точка обмена). Но сейчас мы не будем заострять на этом внимание, точки обмена будут рассмотрены в третьем уроке. Сейчас достаточно знать, что точку обмена по-умолчанию можно определить, указав пустую строку. Это специальная точка обмена ‒ она позволяет определять, в какую именно очередь отправлено сообщение. Имя очереди должно быть определено в параметре routing_key:

Перед выходом из программы необходимо убедиться, что буфер был очищен и сообщение дошло до RabbitMQ. В этом можно быть уверенным, если использовать безопасное закрытие соединения с брокером.

Получение сообщений

что такое exchange rabbitmq. Смотреть фото что такое exchange rabbitmq. Смотреть картинку что такое exchange rabbitmq. Картинка про что такое exchange rabbitmq. Фото что такое exchange rabbitmq

Наша вторая программа receive.py будет получать сообщения из очереди и выводить их на экран.

Также как и в первой программе сначала необходимо подключиться к RabbitMQ. Для этого следует использовать тот же код, как и ранее. Следующий шаг, как и прежде ‒ убедиться, что очередь существует. Команда queue_declare не будет создавать новую очередь, если она уже существует, поэтому сколько бы раз не была вызвана эта команда, все-равно будет создана только одна очередь.

Вы можете задаться вопросом, почему мы объявляем очередь снова, ведь она уже была объявлена в первой программе. Это нужно, чтобы быть уверенным в существовании очереди, так будет, если сначала будет запущена программа send.py. Но мы не знаем, какая программа будет запущена раньше. В таких случаях лучше объявить очередь в обеих программах.

Мониторинг очередей

Если Вы хотите посмотреть, какие очереди существуют в RabbitMQ на данный момент, Вы можете сделать это с помощью команды rabbitmqctl (потребуются права суперпользователя):

(для Windows ‒ без sudo)

[в нашей компании используют более удобный скрипт мониторинга:]

[скрипт выводит и обновляет каждые 2 секунды таблицу со списком очередей: имя очереди; количество сообщений в обработке; количество сообщений готовых к обработке; общее количество сообщений; устойчивость очереди к перезагрузке сервиса; является ли временной очередью; количество подписчиков]

Получение сообщений из очереди более сложный процесс, чем отправка. Получение осуществляется при помощи подписки с использованием callback функции. При получении каждого сообщения библиотека Pika вызывает эту callback функцию. В нашем примере она будет выводить на экран текст сообщения.

Далее, нам нужно обозначить, что callback функция будет получать сообщения из очереди с именем «hello»:

Здесь необходимо быть уверенным в том, что очередь, на которую мы хотим подписаться, была объявлена. Мы сделали это ранее с помощью команды queue_declare.

Параметр no_ack будет рассмотрен позже [во втором уроке].
И, наконец, запуск бесконечного процесса, который ожидает сообщения из очереди и вызывает callback функцию, когда это необходимо.

Ну а теперь все вместе

Полный код receive.py:

Теперь мы можем попробовать запустить наши программы в терминале. Сначала отправим сообщение при помощи программы send.py:

Выполнение этой программы будет завершаться после отправки каждого сообщения. Теперь сообщение нужно получить:

Отлично! Мы отправили наше первое сообщение через RabbitMQ. Как Вы могли заметить, выполнение программы receive.py не завершилось. Она будет ожидать следующих сообщений, а остановить ее можно, нажав Ctrl+C.
Попробуйте запустить send.py снова в новом окне терминала.

Мы изучили, как отправлять и получать сообщения через именованные очереди. В следующем уроке мы создадим простую очередь задач [ресурсоемких].

UPD: библиотеку, работающую с RabbitMQ, для своего любимого ЯП Вы можете найти на официальном сайте тут.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *