Простой сервер задач с очередью в MySQL (без проблем с блокировками)

Источник: habrahabr
evgenyl

Почти в каждом более менее динамическом проекте бывает возникает необходимость выполнять очереди задач в фоне (отправка email, обновления кеша, реиндексация поиска и т.д.). Job сервера (Gearman и т.п.) хороши, но для большинства простых задач они избыточны. Классическая реализация очередей в MySQL (при помощи SELECT … LOCK FOR UPDATE) при росте нагрузки со временем начинает приводить к проблемам с блокировкой. Потому, как это обычно бывает, пришлось написать свой "велосипед" для работы с фоновыми задачами, который бы "точно работал" и был предельно прост.

Основа: Cron, PHP 5.3 (mysqli), MySQL > 5.1 - легко "влепить" почти на любой хостинг.
Операция получения (захвата) задачи - атомарна (один UPDATE запрос). Никаких проблем с блокировкой и RC.
Возможность распределения воркерам задач по группам и приоритетам, передача массива данных в исполняемый метод (функцию).
Три режима обработки завершенных задач: переместить запись в отдельную таблицу, удалить запись, оставить запись и отметить как успешно обработанная.
Обработка незавершенных задач или задач, обработанных с ошибкой - на совести разработчика.
На всё про всё 400 строк кода (с полными PHPDOC).
Ограничения: текущая реализация не подходит для persistent соединений, но если кому-то потребуется, несложно допилить. Даже при желании переписать на другой язык :)

Возможность неблокирующей работы с очередью реализована через использование пользовательских переменных в UPDATE запросе с их последующей выборкой. Посвящать этому приему целую статью - глупо. Гораздо приятнее конечная реализация, которую можно применить в дело (Мы же с вами практики, не так ли?). Во всём остальном исключительно классическая очередь с группами и приоритетами. 

Пример использования (клиент):

$task_server = \DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue');
$task_server->addTask('mywork', $data);


mywork - функция, которая должна быть доступна воркеру. В нее будет передан массив $data. Также возможно указывать вызов статических методов класса.

$task_server->addTask('MyWork::doWork', $data);



Пример воркера:

\DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue') // Создаем сервер.
        ->setByCLIAgruments($argv) // Устанавливаем параметры вызова из консоли.
        ->setMode(\DBTaskServer::MODE_MARK_AS_COMPLETED) // Выбираем режим обработки.
        ->run(); // Запускам воркера.



Запуск воркера из консоли с параметрами:

/path/to/script/worker.php [max_tasks_per_lifecycle] [comma_separated_group_ids]


Как понятно из названия, первая опция говорит о том сколько максимум задач может выполнить воркер прежде чем завершит работу (если конечно таковые для него будут доступны), вторая опция - это значения group_id заданий, которые данный воркер должен обрабатывать. Если группы не указаны, то воркер обрабатывает любые группы.

Например:

/path/to/script/worker.php 100 3,5,6


Выполнить 100 заданий из групп 3, 5 и 6.
Если заданий не будет найдено, то воркер сразу завершит свою работу.

Добавляем воркера в крон:

0-59/5 * * * * /path/to/script/worker.php 5 3 >/dev/null 2>&1


Каждые 5 минут обрабатывать по 5 заданий с group_id=3.

В архиве примеры клиента, воркера, сам класс сервера (задокументирован), sql файл с таблицей задач.
Качать тут (аж целых 5kB).

Приятного вам кода.


Страница сайта http://www.interface.ru
Оригинал находится по адресу http://www.interface.ru/home.asp?artId=29811