✨ No manual wiring
Skip the tedious pg_cron → pgmq → Edge Function setup. No manual queue wiring, no archive code, no state table management. pgflow handles all the plumbing - you just define your workflow. Learn more →
Parallel execution with automatic retry. How it works?
Two tasks run in parallel. One fails and automatically retries. After both succeed, they trigger the final publish step.
1. Set up queues
SELECT pgmq.create('scrape_queue');SELECT pgmq.create('summarize_queue');SELECT pgmq.create('extract_keywords_queue');2. Scrape processor Edge Function
import { serve } from 'std/http/server.ts';import { scrapeWebsite } from './scraper.ts';
serve(async () => { const supabase = createClient(/*...*/);
// Read one message from scrape queue const { data: messages } = await supabase.rpc( 'pgmq_read', { queue_name: 'scrape_queue', vt: 30, // 30 second visibility timeout qty: 1 } );
if (!messages?.[0]) { return new Response('No messages', { status: 200 }); }
const { msg_id, message } = messages[0]; const { url } = message;
try { // Scrape the website const content = await scrapeWebsite(url);
// Store in articles table (state tracking) const { data: article } = await supabase .from('articles') .insert({ url, content }) .select() .single();
// Send to BOTH parallel queues await supabase.rpc('pgmq_send', { queue_name: 'summarize_queue', msg: { url } });
await supabase.rpc('pgmq_send', { queue_name: 'extract_keywords_queue', msg: { url } });
// Delete from scrape queue await supabase.rpc('pgmq_delete', { queue_name: 'scrape_queue', msg_id });
} catch (error) { console.error('Scrape failed:', error); // Message becomes visible again after timeout }
return new Response('OK');});3. Summarize processor Edge Function
import { serve } from 'std/http/server.ts';import { summarizeContent } from './summarizer.ts';
serve(async () => { const supabase = createClient(/*...*/);
// Read one message from summarize queue const { data: messages } = await supabase.rpc( 'pgmq_read', { queue_name: 'summarize_queue', vt: 30, qty: 1 } );
if (!messages?.[0]) { return new Response('No messages', { status: 200 }); }
const { msg_id, message } = messages[0]; const { url } = message;
try { // Fetch article from state table const { data: article } = await supabase .from('articles') .select('content') .eq('url', url) .single();
// Summarize content const summary = await summarizeContent(article.content);
// Update state table with summary await supabase .from('articles') .update({ summary }) .eq('url', url);
// Delete from summarize queue await supabase.rpc('pgmq_delete', { queue_name: 'summarize_queue', msg_id });
} catch (error) { console.error('Summarize failed:', error); // Message becomes visible again after timeout }
return new Response('OK');});4. Extract keywords processor Edge Function
import { serve } from 'std/http/server.ts';import { extractKeywords } from './extractor.ts';
serve(async () => { const supabase = createClient(/*...*/);
// Read one message from extract queue const { data: messages } = await supabase.rpc( 'pgmq_read', { queue_name: 'extract_keywords_queue', vt: 30, qty: 1 } );
if (!messages?.[0]) { return new Response('No messages', { status: 200 }); }
const { msg_id, message } = messages[0]; const { url } = message;
try { // Fetch article from state table const { data: article } = await supabase .from('articles') .select('content') .eq('url', url) .single();
// Extract keywords const keywords = await extractKeywords(article.content);
// Update state table with keywords await supabase .from('articles') .update({ keywords }) .eq('url', url);
// Delete from extract queue await supabase.rpc('pgmq_delete', { queue_name: 'extract_keywords_queue', msg_id });
} catch (error) { console.error('Extract failed:', error); // Message becomes visible again after timeout }
return new Response('OK');});5. Publish articles cron job
-- Poll for completed articles and publish themSELECT cron.schedule( 'publish-articles', '*/15 * * * * *', $$ UPDATE articles SET published = true WHERE published = false AND summary IS NOT NULL AND keywords IS NOT NULL $$);6. Schedule queue processors
-- Scrape processor runs every 15 secondsSELECT cron.schedule( 'scrape-processor', '*/15 * * * * *', $$SELECT net.http_post( url := 'https://your-project.supabase.co/functions/v1/scrape-processor' )$$);
-- Summarize processor runs every 15 secondsSELECT cron.schedule( 'summarize-processor', '*/15 * * * * *', $$SELECT net.http_post( url := 'https://your-project.supabase.co/functions/v1/summarize-processor' )$$);
-- Extract keywords processor runs every 15 secondsSELECT cron.schedule( 'extract-keywords-processor', '*/15 * * * * *', $$SELECT net.http_post( url := 'https://your-project.supabase.co/functions/v1/extract-keywords-processor' )$$);7. Trigger workflow
// Add URL to scrape queue to start the workflowawait supabase.rpc('pgmq_send', { queue_name: 'scrape_queue', msg: { url: 'https://example.com' }});Based on Supabase’s recommended pattern. Read the full guide →
import { Flow } from 'npm:@pgflow/dsl';
new Flow<{ url: string }>({ slug: 'analyzeArticle' }) .step({ slug: 'fetchArticle' }, (input) => scrapeWebsite(input.run.url) ) .step({ slug: 'summarize', dependsOn: ['fetchArticle'] }, (input) => summarizeContent(input.fetchArticle) ) .step({ slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, (input) => extractKeywords(input.fetchArticle) ) .step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, (input) => publishArticle({ url: input.run.url, content: input.fetchArticle, summary: input.summarize, keywords: input.extractKeywords }) );Based on Supabase’s recommended pattern. Read the full guide →
npx pgflow@latest install✨ No manual wiring
Skip the tedious pg_cron → pgmq → Edge Function setup. No manual queue wiring, no archive code, no state table management. pgflow handles all the plumbing - you just define your workflow. Learn more →
📦 Runs entirely in Supabase
Everything in your existing Supabase project. No Bull, no Redis, no Temporal, no Railway. No external services, no vendor dashboards, no additional infrastructure to manage. Learn more →
👁️ Full observability in SQL
All workflow state lives in Postgres tables. Query execution history, inspect step outputs, and debug failures with standard SQL. No hidden state, no external dashboards. Learn more →
🔄 Automatic retries
Built-in retry logic with exponential backoff for flaky AI APIs. When OpenAI times out or rate-limits, only that step retries - your workflow continues. Configure max attempts and delays per step, no retry code needed. Learn more →
⚡ Parallel array processing
Process arrays in parallel with independent retries per item. Batch 100 embeddings - if 3 fail, only those 3 retry while others continue. Perfect for AI workloads with unreliable APIs. Learn more →
🔌 Trigger from anywhere
Start workflows from database triggers, scheduled pg_cron jobs, browser clients, or RPC calls. Ultimate flexibility in how you start your workflows. Learn more →
"Got a flow up and running, this is bad ass. I love that everything just goes into Postgres."
— @cpursley (Discord)
"EdgeWorker is clearly a building block missing from supabase. I toyed with it locally for several days and it worked great."
— @nel (Discord)
"I'm really enjoying PQFlow — this is incredibly powerful! I'm implementing it in a flow with RAG."
— @Alipio Pereira (Discord)
"I was for a while now feeling like there had to be a better way of handling workflows on a supabase project without needing to add something like inngest or n8n to my tech stack. It clicked really hard when I found out about your project"
— @_perhaps (Discord)
"I was searching through the docs for something like this and I'm quite surprised it's not part of Supabase already. A queue feels kinda useless with serverless runners if I need to trigger them manually"
— @TapTap2121 (Reddit)
"Exactly what I was looking for without even knowing it :) well I knew I need smt like this, but I thought I'd had to build a very rudimentary version myself. Thank you for saving me tons of time"
— @CjHuber (Hacker News)
"I was trying to build my own after having a couple of pgmq queues... you found a problem and you are giving a very good solution"
— @enciso (Discord)