Асинхронная обработка#

Асинхронная обработка#

asda

Идея асинхронной обработки событий состоит в том, что возникновение события и его обработка разносятся во времени.

Например, пользователь хочет получить детализацию расходов на мобильную связь. Детализация формируется несколько минут. Можно показать пользователю «песочные часы» и заставить его ждать (синхронная обработка), а можно выслать детализацию по электронной почте, когда она будет готова (асинхронная обработка).Другой пример: интеграция двух систем. Первая система обращается ко второй, передавая пакет сообщений. Обработка одного сообщения занимает несколько секунд, но в пакете может оказаться тысяча сообщений. Можно заставить первую систему ожидать получения результата (синхронная обработка), а можно ответить «работаем», обработать сообщения асинхронно и уже затем сообщить результат.

Асинхронная обработка сложнее синхронной, но часто оказывается очень удобной. Она позволяет работать эффективнее (клиенту не надо простаивать, дожидаясь ответа) и управлять ресурсами (обрабатывать события с удобной скоростью и в удобное время, а не немедленно).(Асинхронная обработка широко применяется и в ядре PostgreSQL. Вспомните режим асинхронной фиксации; процесс контрольной точки; процесс автоочистки.)Обычная реализация состоит в наличии очереди событий (сообщений): одни процессы создают события, другие — обрабатывают.

Возможны более сложные модели, в которых есть возможность публиковать события и подписываться на события нужного типа.

Внешние системы#

asda

Одним из вариантов реализации очередей событий являются внешние системы. Названия многих из них традиционно заканчиваются на MQ — Message Queuing.

Как правило, это большие серьезные системы, обеспечивающие гибкость, масштабируемость, высокую производительность и прочие полезные свойства. К тому же они реализуют один или несколько стандартных протоколов работы с сообщениями, что позволяет интегрировать их с другими системами, понимающими те же протоколы.

Но надо понимать, что любая большая система потребует серьезных затрат на ее изучение и внедрение. Потребуется разобратьсяс особенностями настройки, администрирования, мониторинга. Заметим, что в состав систем работы с очередями входит и отдельная СУБД для надежного хранения очередей.

Кроме того, использование внешней системы приводит ко всем сложностям построения распределенных систем.

При отсутствии глобальных транзакций, объединяющих разные системы, возможны случаи потери сообщений в результате сбоев.

Очереди#

Очереди внутри базы PgQ#

asda

Более простым решением может служить реализация очереди в самой СУБД. Особенно это имеет смысл, если события возникаюти обрабатываются на сервере баз данных.

Наиболее известна система PgQ, разработанная в свое время компанией Skype (https://github.com/pgq).

Эта система достаточно широко используется и про нее известно, что она работает. Если требуется готовое решение, то ей можно и воспользоваться. Поддерживаются версии PostgreSQL 10+.

Из минусов этого решения отметим:

  • Недостаточную гибкость. Например, возможна только пакетная обработка событий. Пока обработчик не пометит пакет, как полностью обработанный,

все события пакета могут быть доставлены повторно в случае сбоя.

  • Отсутствие качественной документации (есть описание API:https://pgq.github.io/extension/pgq/).

  • Необходимость во внешней (относительно СУБД) программе, обеспечивающей работу очереди.

Очереди внутри базы pgmq#

asda

Решение заявлено компанией Tembo как легковесная реализация очередей. Расширение pgmq написано на Rust и использует инфраструктуру сборки расширений pgrx.

Очереди и метаинформация представлены таблицами в базе данных,а элементы очереди («сообщения») — записями в таблицах. Сообщения можно помещать в очередь (в том числе по несколько сразу), читать из очереди, удалять и архивировать. Для выполнения этих основных действий (и еще ряда дополнительных) в расширении предусмотрен набор функций. Детали реализации очереди скрытыот пользователя, что упрощает работу.

Из минусов pgmq отметим следующие:

  • Недостаточная гибкость в управлении на низком уровне. Связано с тем, что расширение pgmq базируется на фреймворке pgrx,в который встроено,

в том числе, управление соединениямис сервером СУБД. Повлиять на него средствами уже собранного pgmq нельзя.

  • Документация недостаточно подробна.

  • В репозитории отсутствуют готовые пакеты, а сборка расширения может оказаться нетривиальной и ресурсоемкой.

https://tembo.io/pgmq/https://github.com/tembo-io/pgmq

ПРАКТИКА#

Очередь средствами расширения pgmq

Создадим базу данных и подключимся к ней:

CREATE DATABASE ext_async;

CREATE DATABASE
\c ext_async

You are now connected to database “ext_async” as user “student”.

Расширение pgmq уже собрано и доступно для установки. Выполним команду создания расширения в нашей базе. Все его объекты будут размещены в схеме pgmq:

CREATE EXTENSION pgmq;

CREATE EXTENSION

Создадим очередь под названием pgmq_queue:

SELECT pgmq.create('pgmq_queue');

 create
--------

(1 row)

Информация об очередях хранится в таблице meta; посмотреть очереди можно с помощью табличной функции:

SELECT * FROM pgmq.list_queues();

 queue_name |          created_at           | is_partitioned | is_unlogged
------------+-------------------------------+----------------+-------------
 pgmq_queue | 2025-11-26 19:13:04.549786+03 | f              | f
(1 row)

Также были созданы таблицы для сообщений очереди: основная q_pgmq_queue и архивная a_pgmq_queue:

\dt pgmq.*

                        List of relations
 Schema |     Name     | Type  |  Owner
--------+--------------+-------+---------
 pgmq   | a_pgmq_queue | table | student
 pgmq   | meta         | table | student
 pgmq   | q_pgmq_queue | table | student
(3 rows)

Поместим в очередь несколько сообщений (полезная информация представляется значением типа jsonb)…

SELECT pgmq.send('pgmq_queue', to_jsonb(i))
FROM (
        VALUES ('alpha'), ('beta'), ('gamma')
) AS v(i);

 send
------
        1
        2
        3
(3 rows)

…и заглянем в основную таблицу очереди:

SELECT msg_id, enqueued_at, message
FROM pgmq.q_pgmq_queue
ORDER BY msg_id;

 msg_id |          enqueued_at          | message
--------+-------------------------------+---------
          1 | 2025-11-26 19:13:04.959321+03 | "alpha"
          2 | 2025-11-26 19:13:04.959321+03 | "beta"
          3 | 2025-11-26 19:13:04.959321+03 | "gamma"
(3 rows)

Простой способ забрать сообщение из очереди — вызвать функцию pop:

SELECT msg_id, enqueued_at, message
FROM pgmq.pop('pgmq_queue');

 msg_id |          enqueued_at          | message
--------+-------------------------------+---------
          1 | 2025-11-26 19:13:04.959321+03 | "alpha"
(1 row)

Другие обработчики тоже могут брать сообщения:

student$ psql -d ext_async
SELECT msg_id, enqueued_at, message
FROM pgmq.pop('pgmq_queue');

 msg_id |          enqueued_at          | message
--------+-------------------------------+---------
          2 | 2025-11-26 19:13:04.959321+03 | "beta"
(1 row)
student$ psql -d ext_async
SELECT msg_id, enqueued_at, message
FROM pgmq.pop('pgmq_queue');

 msg_id |          enqueued_at          | message
--------+-------------------------------+---------
          3 | 2025-11-26 19:13:04.959321+03 | "gamma"
(1 row)

А первый при очередном обращении обнаружит, что очередь пуста:

SELECT msg_id, enqueued_at, message
FROM pgmq.pop('pgmq_queue');

 msg_id | enqueued_at | message
--------+-------------+---------
(0 rows)

И напоследок удалим саму очередь. При этом ее основная и архивная таблицы исчезнут, как и информация в таблице meta:

SELECT pgmq.drop_queue('pgmq_queue');

 drop_queue
------------
 t
(1 row)
\dt pgmq.*

                List of relations
 Schema | Name | Type  |  Owner
--------+------+-------+---------
 pgmq   | meta | table | student
(1 row)

Самостоятельная организация очереди#

asda

Для решения простой задачи, требующей асинхронной обработки, использование сторонних систем может оказаться невыгодным. Возможно, проще написать собственную реализацию, чем приспосабливаться к особенностям сторонней системы.Конечно, нужно понимать, что:

  • реализация должна быть сделана аккуратно, иначе она может привести к проблемам эксплуатации;

  • если к системе очередей предъявляются серьезные требования(или есть шанс, что такие требования появятся в будущем), то развитие, тестирование и поддержка собственного решения,

наоборот, может оказаться невыгодной.

ПРАКТИКА#

Реализация очереди сообщений

Наша задача: реализовать простую очередь сообщений с возможностью конкурентного получения сообщений из нескольких процессов. Полезную информацию снова представим типом JSON — так очередь будет достаточно универсальна.

В каждый конкретный момент времени в таблице сообщений не будет много строк, но за все время работы их может оказаться существенное количество. Поэтому идентификатор надо сразу сделать 64-разрядным:

CREATE TABLE msg_queue(
        id bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
        payload jsonb NOT NULL,
        pid integer DEFAULT NULL -- процесс-обработчик
);

CREATE TABLE

Вставка сообщений в очередь проста:

INSERT INTO msg_queue(payload)
VALUES
        (to_jsonb(1)),
        (to_jsonb(2)),
        (to_jsonb(3));
INSERT 0 3

Теперь займемся функцией получения и блокирования очередного сообщения.

Нам требуется блокировать полученную строку, чтобы одно сообщение не могло быть выбрано два раза (двумя одновременно работающими обработчиками). Это можно сделать с помощью фразы FOR UPDATE:

BEGIN;

BEGIN
SELECT * FROM msg_queue
WHERE pid IS NULL -- никем не обрабатывается
ORDER BY id LIMIT 1 -- одно в порядке поступления
FOR UPDATE;

 id | payload | pid
----+---------+-----
  1 | 1       |
(1 row)

Но в таком случае аналогичный запрос в другом процессе будет заблокирован до завершения первой транзакции.

\c ext_async

You are now connected to database "ext_async" as user "student".
| BEGIN;
  BEGIN
| SELECT * FROM msg_queue
| WHERE pid IS NULL
| ORDER BY id LIMIT 1
| FOR UPDATE;

Вторая транзакция заблокирована.

DELETE FROM msg_queue
WHERE id = 1;

DELETE 1
COMMIT;
COMMIT
|       id | payload | pid
|----+---------+-----
|  2 | 2       |
|(1 row)
| COMMIT;
| COMMIT

Для того чтобы не останавливаться на заблокированных строках, служит фраза SKIP LOCKED команды SELECT.

BEGIN;

BEGIN
SELECT * FROM msg_queue
WHERE pid IS NULL
ORDER BY id LIMIT 1
FOR UPDATE SKIP LOCKED;

 id | payload | pid
----+---------+-----
  2 | 2       |
(1 row)
| BEGIN;
| BEGIN
| SELECT * FROM msg_queue
| WHERE pid IS NULL
| ORDER BY id LIMIT 1
| FOR UPDATE SKIP LOCKED;

| id | payload | pid
| ----+---------+-----
|   3 | 3       |
| (1 row)
COMMIT;
COMMIT
| COMMIT;

| COMMIT

Итак, функция для получения и блокирования очередного сообщения может выглядеть следующим образом:

       CREATE FUNCTION take_message(OUT msg msg_queue)
       LANGUAGE sql VOLATILE
       BEGIN ATOMIC
               UPDATE msg_queue
               SET pid = pg_backend_pid()
               WHERE id = (SELECT id FROM msg_queue
               WHERE pid IS NULL
                       ORDER BY id LIMIT 1
               FOR UPDATE SKIP LOCKED) RETURNING *;
       END;

       CREATE FUNCTION

--------------------------

В практических заданиях к темам «Очистка» и «Фоновые задания» мы рассматривали типичное решение для получения пакета строк таблицы, например, с целью обновления или удаления. Запрос выглядел так:

WITH batch AS (
        SELECT * FROM t
        WHERE /* необходимые условия */
        LIMIT /* размер пакета */
        FOR UPDATE SKIP LOCKED
)

… Как видите, в обоих случаях используется тот же самый подход: выбирается и блокируется часть строк (одна или несколько), при этом уже заблокированные строки пропускаются.

Теперь напишем функцию завершения работы с сообщением. Мы будем просто удалять его из очереди.

       CREATE FUNCTION complete_message(msg msg_queue) RETURNS void
       LANGUAGE sql VOLATILE
       BEGIN ATOMIC
         DELETE FROM msg_queue WHERE id = msg.id;
       END;

       CREATE FUNCTION

--------------------------------

Теперь мы готовы написать цикл обработки сообщений. Оформим его в виде процедуры.

CREATE PROCEDURE process_queue() AS $$
DECLARE
        msg msg_queue;
BEGIN
        LOOP
                SELECT * INTO msg FROM take_message();
                EXIT WHEN msg.id IS NULL;

                -- обработка
                PERFORM pg_sleep(1);
                RAISE NOTICE '[%] processed %; n_tup_del=%, backend_xmin=%',
                        pg_backend_pid(),
                        msg.payload,

                        (SELECT n_tup_del FROM pg_stat_xact_all_tables  -- статистика, накопленная внутри транзакции
                         WHERE relname = 'msg_queue'),

                        (SELECT backend_xmin FROM pg_stat_activity
                         WHERE pid = pg_backend_pid());

                PERFORM complete_message(msg);
        END LOOP;
END;
$$ LANGUAGE plpgsql;

CREATE PROCEDURE

В этом варианте цикл заканчивается, когда в очереди не остается необработанных сообщений. Вместо этого можно не прекращать цикл, но продолжать ожидать новые события, засыпая, например, на одну секунду.

Пробуем.

CALL process_queue();

NOTICE:  [35341] processed 2; n_tup_del=0, backend_xmin=833
NOTICE:  [35341] processed 3; n_tup_del=1, backend_xmin=833
CALL

Теперь в два потока.

INSERT INTO msg_queue(payload)
SELECT to_jsonb(id) FROM generate_series(1,10) id;

INSERT 0 10
\timing on

Timing is on.
CALL process_queue();
| CALL process_queue();

| NOTICE:  [36166] processed 2; n_tup_del=0, backend_xmin=835
| NOTICE:  [36166] processed 4; n_tup_del=1, backend_xmin=835
| NOTICE:  [36166] processed 6; n_tup_del=2, backend_xmin=835
| NOTICE:  [36166] processed 8; n_tup_del=3, backend_xmin=835
| NOTICE:  [36166] processed 10; n_tup_del=4, backend_xmin=835
| CALL
NOTICE:  [35341] processed 1; n_tup_del=0, backend_xmin=835
NOTICE:  [35341] processed 3; n_tup_del=1, backend_xmin=835
NOTICE:  [35341] processed 5; n_tup_del=2, backend_xmin=835
NOTICE:  [35341] processed 7; n_tup_del=3, backend_xmin=835
NOTICE:  [35341] processed 9; n_tup_del=4, backend_xmin=835
CALL

Time: 5009,451 ms (00:05,009)
\timing off
Timing is off.

Обработка 10 сообщений двумя потоками заняла около 5 секунд, но горизонт транзакций держался на одном уровне все время обработки очереди! Это будет мешать выполнению очистки и создавать проблемы для всей базы данных.

ГОРИЗОНТ#

asda

Показанное решение имеет существенный недостаток: вся обработка выполняется в одной длинной транзакции. Вспоминая темы «Многоверсионность» и «Очистка» модуля «Архитектура», можно с уверенностью сказать, что обработка очереди будет мешать нормальной работе очистки.

asda

Чтобы таких проблем не возникало, надо раздробить длинную транзакцию на несколько более коротких. В нашем случае — обрабатывать каждое событие в собственной транзакции.

asda

Более того, обработка одного события тоже может (в принципе) разбиваться на несколько транзакций.

В таком случае мы сначала фиксируем изменение статуса событияв очереди («в работе»), затем выполняем обработку, а в конце фиксируем факт завершения работы с событием (например, удаляем его из таблицы).

ПРАКТИКА#

Учитываем горизонт транзакций Это легко сделать, поскольку процедура позволяет управлять транзакциями.

CREATE OR REPLACE PROCEDURE process_queue() AS $$
DECLARE
        msg msg_queue;
BEGIN
        LOOP
                SELECT * INTO msg FROM take_message();
                COMMIT; --<<
                EXIT WHEN msg.id IS NULL;

                -- обработка
                PERFORM pg_sleep(1);
                RAISE NOTICE '[%] processed %; n_dead_tup=%, n_tup_del=%, backend_xmin=%',
                        pg_backend_pid(),
                        msg.payload,

                        (SELECT n_dead_tup FROM pg_stat_all_tables  -- статистика, учитываемая автоочисткой
                         WHERE relname = 'msg_queue'),

                        (SELECT n_tup_del FROM pg_stat_xact_all_tables
                         WHERE relname = 'msg_queue'),

                        (SELECT backend_xmin FROM pg_stat_activity
                         WHERE pid = pg_backend_pid());

                PERFORM complete_message(msg);
                COMMIT; --<<
        END LOOP;
END;
$$ LANGUAGE plpgsql;

CREATE PROCEDURE

Проверим:

INSERT INTO msg_queue(payload)
SELECT to_jsonb(id) FROM generate_series(1,5) id;

INSERT 0 5
CALL process_queue();

Теперь горизонт транзакций продвигается вперед и не мешает очистке. Однако момент срабатывания автоматической очистки определяется на основе данных статистики, которая обновляется лишь по окончании работы всего оператора CALL.

SELECT n_dead_tup, n_live_tup, n_mod_since_analyze, n_ins_since_vacuum
FROM pg_stat_all_tables
WHERE relname = 'msg_queue';

 n_dead_tup | n_live_tup | n_mod_since_analyze | n_ins_since_vacuum
------------+------------+---------------------+--------------------
                 10 |          0 |                  15 |                  5
(1 row)

Таким образом, чтобы таблица с очередью вовремя очищалась, можно:

  • модифицировать процедуру process_queue таким образом, чтобы обеспечить ее гарантированное периодическое завершение и положиться на автоочистку;

  • периодически запускать обычную (неавтоматическую) очистку. Один из способов реализации — использование фоновых процессов.

Не хватает#

asda

Чего не хватает в нашей реализации?Во-первых, обработчик может завершиться аварийно. Если мы фиксируем изменение статуса обработки, то событие «повиснет»в статусе «в работе» и не будет больше обрабатываться.

В нашей реализации мы уже сделали шаг в нужную сторону: в таблице сохраняется номер обслуживающего процесса (pid), который взял событие в работу. Можно написать простую проверку: если pid имеется в таблице, но процесса с таким номером нет в системе — значит, произошел сбой.Что делать в таком случае? Если обработка события выполняласьв одной транзакции, то она была прервана и, следовательно, можно безопасно вернуть событие в статус «новое» — оно будет обработано повторно.

Если же обработка делится на несколько транзакций, надо быть уверенным в том, что обработку можно запускать повторно.

asda

Во-вторых, наша реализация никак не обрабатывает исключительные ситуации. Это, конечно, несложно добавить. При возникновении исключения хотелось бы иметь информацию о том, что случилось.

Да и если событие обработано без ошибок, может быть полезным сохранять какую-то информацию об обработке. Это, конечно, зависит от конкретной задачи.

Наша реализация удаляет обработанные события из очереди, но вместо этого можно оставлять их, помечая специальным статусом («завершено», «ошибка» и т. п.). Тогда всю информацию об обработке можно иметь непосредственно в таблице с событиями. В таком случае потребуется эффективный доступ к еще не обработанным сообщениям: частичный индекс с условием pid IS NULL. (Другим решением может быть перенос обработанных событий в отдельную таблицу.)

За удобство потребуется платить реализацией периодического удаления «хвоста» очереди — исторических данных. Если период достаточно большой, то, возможно, удаление надо выполнять пакетами — чтобы не допускать лишнего разрастания таблицыи не мешать очистке.

И, поскольку таблица очередей изменяется довольно активно,надо обеспечить ее своевременную очистку, о чем говорилосьв демонстрации.