Às vezes você não quer arrastar Kafka ou RabbitMQ para dentro de um projeto só para rodar uma fila em segundo plano bem simples: enviar um e-mail, recalcular um relatório, gerar um PDF. Se você já roda o PostgreSQL, ele mesmo pode ser a fila, e fazer isso de forma confiável. Todo o truque se resume a uma única cláusula: FOR UPDATE SKIP LOCKED. Ela permite que dez workers puxem tarefas de uma única tabela em paralelo sem pisar uns nos outros e sem se acumular atrás dos locks.
Vamos passar por isso com uma tabela jobs ligada a pedidos.
O problema: dois workers pegam a mesma tarefa
Comece com uma tabela de fila. Cada linha é uma tarefa, digamos «enviar a confirmação do pedido».
CREATE TABLE jobs (
id bigserial PRIMARY KEY,
order_id bigint NOT NULL,
status text NOT NULL DEFAULT 'pending',
run_after timestamptz NOT NULL DEFAULT now(),
attempts int NOT NULL DEFAULT 0,
locked_by text,
created_at timestamptz NOT NULL DEFAULT now()
);
INSERT INTO jobs (order_id) VALUES (1001), (1002), (1003);
Um worker ingênuo faz isto:
SELECT id FROM jobs WHERE status = 'pending' ORDER BY id LIMIT 1;
Entre o SELECT e o UPDATE, um segundo worker lê a mesma linha. Resultado: dois workers processam o pedido 1001 e o e-mail é enviado duas vezes. Essa é a clássica condição de corrida.
A correção: FOR UPDATE SKIP LOCKED
O PostgreSQL pode adquirir um lock no nível da linha dentro do próprio SELECT. Adicione FOR UPDATE e a linha fica bloqueada até a transação terminar. Adicione SKIP LOCKED e um worker concorrente não vai esperar por uma linha bloqueada: ele simplesmente a pula e pega a próxima livre.
BEGIN;
SELECT id, order_id
FROM jobs
WHERE status = 'pending'
AND run_after <= now()
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;
UPDATE jobs
SET status = 'processing', locked_by = 'worker-7', attempts = attempts + 1
WHERE id = 1;
COMMIT;
Passo a passo:
FOR UPDATE bloqueia as linhas selecionadas até o COMMIT/ROLLBACK.
SKIP LOCKED exclui as linhas já bloqueadas por outras transações, sem esperar.
LIMIT 1 pega exatamente uma tarefa (você também pode pegar um lote, veja abaixo).
ORDER BY id define a ordem; sem ele a fila fica imprevisível.
Rode este bloco em duas sessões ao mesmo tempo: a primeira recebe id = 1, a segunda recebe instantaneamente id = 2. Ninguém espera por ninguém.
Reivindicar um lote em uma única instrução
A transação «SELECT e depois UPDATE» funciona, mas é mais limpo fazer isso em uma única instrução com um CTE, e você ainda pode reivindicar várias tarefas de uma vez para um worker em lote:
WITH picked AS (
SELECT id
FROM jobs
WHERE status = 'pending'
AND run_after <= now()
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 10
)
UPDATE jobs j
SET status = 'processing',
locked_by = 'worker-7',
attempts = j.attempts + 1
FROM picked
WHERE j.id = picked.id
RETURNING j.id, j.order_id;
O RETURNING entrega ao worker as tarefas reivindicadas na hora: id e order_id para processar. O lock dura apenas o tempo desta transação; após o COMMIT as linhas já estão marcadas como processing, então outros workers as pulam de qualquer forma no próximo polling.
Quando o trabalho está concluído, finalize a tarefa:
UPDATE jobs SET status = 'done', locked_by = NULL WHERE id = 1;
E se uma tarefa falhar, recoloque-a na fila com um atraso para que seja repetida mais tarde:
UPDATE jobs
SET status = 'pending',
run_after = now() + interval '30 seconds'
WHERE id = 1 AND attempts < 5;
Índices e desempenho
Sem um índice, o SKIP LOCKED ainda varre a tabela e fica esbarrando em linhas bloqueadas. Adicione um índice parcial com o formato exato da consulta de reivindicação:
CREATE INDEX idx_jobs_pending
ON jobs (run_after, id)
WHERE status = 'pending';
O índice cobre o filtro (status = 'pending', run_after <= now()) e a ordenação (id), e por ser parcial se mantém pequeno: as tarefas concluídas nunca entram nele.
Armadilhas
SKIP LOCKED não faz sentido sem uma transação. O lock vive exatamente até a transação terminar. Rode SELECT ... FOR UPDATE em modo autocommit e a linha é liberada de imediato, então um worker paralelo a pega. Mantenha a reivindicação e a marcação processing em uma única transação (ou em uma única instrução com CTE).
- Não deixe a transação aberta durante o trabalho em si. Se o worker gasta um minuto gerando um PDF, não deixe o
BEGIN aberto esse tempo todo. Reivindique a tarefa, mude-a para processing, faça COMMIT e só então execute o trabalho lento. Caso contrário, uma única transação travada paralisa o VACUUM e acumula locks.
SKIP LOCKED enfraquece a ordenação. Se a primeira tarefa da fila está bloqueada, um worker pega a próxima. Um FIFO estrito não é garantido: tudo bem para uma fila de tarefas, mas não conte com uma ordem exata.
- MySQL e outros engines. O
SKIP LOCKED existe no MySQL 8.0+ (a sintaxe FOR UPDATE SKIP LOCKED é quase idêntica) e no Oracle. O ClickHouse, porém, é colunar e não foi feito para filas com bloqueio de linhas: para uma fila OLTP recorra ao PostgreSQL ou ao MySQL, não a um engine analítico.
Isso basta para rodar uma fila confiável que lida com centenas de tarefas por segundo sem um broker separado. Quando você bater nesse teto, aí sim comece a pensar no Kafka.
Às vezes você não quer arrastar Kafka ou RabbitMQ para dentro de um projeto só para rodar uma fila em segundo plano bem simples: enviar um e-mail, recalcular um relatório, gerar um PDF. Se você já roda o PostgreSQL, ele mesmo pode ser a fila, e fazer isso de forma confiável. Todo o truque se resume a uma única cláusula:
FOR UPDATE SKIP LOCKED. Ela permite que dez workers puxem tarefas de uma única tabela em paralelo sem pisar uns nos outros e sem se acumular atrás dos locks.Vamos passar por isso com uma tabela
jobsligada a pedidos.O problema: dois workers pegam a mesma tarefa
Comece com uma tabela de fila. Cada linha é uma tarefa, digamos «enviar a confirmação do pedido».
CREATE TABLE jobs ( id bigserial PRIMARY KEY, order_id bigint NOT NULL, status text NOT NULL DEFAULT 'pending', -- pending | processing | done | failed run_after timestamptz NOT NULL DEFAULT now(), attempts int NOT NULL DEFAULT 0, locked_by text, created_at timestamptz NOT NULL DEFAULT now() ); INSERT INTO jobs (order_id) VALUES (1001), (1002), (1003);Um worker ingênuo faz isto:
-- broken with multiple workers SELECT id FROM jobs WHERE status = 'pending' ORDER BY id LIMIT 1; -- ...then a separate UPDATE ... SET status = 'processing'Entre o
SELECTe oUPDATE, um segundo worker lê a mesma linha. Resultado: dois workers processam o pedido 1001 e o e-mail é enviado duas vezes. Essa é a clássica condição de corrida.A correção: FOR UPDATE SKIP LOCKED
O PostgreSQL pode adquirir um lock no nível da linha dentro do próprio
SELECT. AdicioneFOR UPDATEe a linha fica bloqueada até a transação terminar. AdicioneSKIP LOCKEDe um worker concorrente não vai esperar por uma linha bloqueada: ele simplesmente a pula e pega a próxima livre.BEGIN; SELECT id, order_id FROM jobs WHERE status = 'pending' AND run_after <= now() ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1; -- the worker sees, say, id = 1 and claims it UPDATE jobs SET status = 'processing', locked_by = 'worker-7', attempts = attempts + 1 WHERE id = 1; COMMIT;Passo a passo:
FOR UPDATEbloqueia as linhas selecionadas até oCOMMIT/ROLLBACK.SKIP LOCKEDexclui as linhas já bloqueadas por outras transações, sem esperar.LIMIT 1pega exatamente uma tarefa (você também pode pegar um lote, veja abaixo).ORDER BY iddefine a ordem; sem ele a fila fica imprevisível.Rode este bloco em duas sessões ao mesmo tempo: a primeira recebe
id = 1, a segunda recebe instantaneamenteid = 2. Ninguém espera por ninguém.Reivindicar um lote em uma única instrução
A transação «SELECT e depois UPDATE» funciona, mas é mais limpo fazer isso em uma única instrução com um CTE, e você ainda pode reivindicar várias tarefas de uma vez para um worker em lote:
WITH picked AS ( SELECT id FROM jobs WHERE status = 'pending' AND run_after <= now() ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 10 ) UPDATE jobs j SET status = 'processing', locked_by = 'worker-7', attempts = j.attempts + 1 FROM picked WHERE j.id = picked.id RETURNING j.id, j.order_id;O
RETURNINGentrega ao worker as tarefas reivindicadas na hora:ideorder_idpara processar. O lock dura apenas o tempo desta transação; após oCOMMITas linhas já estão marcadas comoprocessing, então outros workers as pulam de qualquer forma no próximo polling.Quando o trabalho está concluído, finalize a tarefa:
UPDATE jobs SET status = 'done', locked_by = NULL WHERE id = 1;E se uma tarefa falhar, recoloque-a na fila com um atraso para que seja repetida mais tarde:
UPDATE jobs SET status = 'pending', run_after = now() + interval '30 seconds' WHERE id = 1 AND attempts < 5;Índices e desempenho
Sem um índice, o
SKIP LOCKEDainda varre a tabela e fica esbarrando em linhas bloqueadas. Adicione um índice parcial com o formato exato da consulta de reivindicação:CREATE INDEX idx_jobs_pending ON jobs (run_after, id) WHERE status = 'pending';O índice cobre o filtro (
status = 'pending',run_after <= now()) e a ordenação (id), e por ser parcial se mantém pequeno: as tarefas concluídas nunca entram nele.Armadilhas
SKIP LOCKEDnão faz sentido sem uma transação. O lock vive exatamente até a transação terminar. RodeSELECT ... FOR UPDATEem modo autocommit e a linha é liberada de imediato, então um worker paralelo a pega. Mantenha a reivindicação e a marcaçãoprocessingem uma única transação (ou em uma única instrução com CTE).BEGINaberto esse tempo todo. Reivindique a tarefa, mude-a paraprocessing, façaCOMMITe só então execute o trabalho lento. Caso contrário, uma única transação travada paralisa oVACUUMe acumula locks.SKIP LOCKEDenfraquece a ordenação. Se a primeira tarefa da fila está bloqueada, um worker pega a próxima. Um FIFO estrito não é garantido: tudo bem para uma fila de tarefas, mas não conte com uma ordem exata.SKIP LOCKEDexiste no MySQL 8.0+ (a sintaxeFOR UPDATE SKIP LOCKEDé quase idêntica) e no Oracle. O ClickHouse, porém, é colunar e não foi feito para filas com bloqueio de linhas: para uma fila OLTP recorra ao PostgreSQL ou ao MySQL, não a um engine analítico.Isso basta para rodar uma fila confiável que lida com centenas de tarefas por segundo sem um broker separado. Quando você bater nesse teto, aí sim comece a pensar no Kafka.