Building a Real-Time S&P 500 Market Data Pipeline
How I built a serverless market performance tracker using Node.js, BigQuery, and Pub/Sub to process millions of stock ticks without breaking the bank.
Last year, I got pulled into a fintech project that needed to track S&P 500 stock performance in near real-time. The client wanted a dashboard showing which stocks were in bull or bear territory, with historical performance metrics updated every few seconds. Sounds straightforward until you realize you're dealing with 500 stocks × thousands of price updates per day × historical data going back years.
This bit me hard at first. I naively started with a simple Express API hitting a REST endpoint every few seconds. That lasted about 20 minutes in production before we hit rate limits and our Heroku dyno started sweating.
Here's what I learned building a proper market data pipeline that actually scales.
Architecture Overview: Why Serverless Won
I've built enough real-time systems to know that WebSockets + a single server is a recipe for pain at scale. For this project, we went with GCP because their Pub/Sub + Cloud Functions combo is honestly perfect for event-driven market data.
The architecture looks like this:
- Data ingestion: Cloud Function triggered every 5 seconds to fetch market data from Alpha Vantage API
- Message queue: Pub/Sub topic for raw market ticks
- Processing: Another Cloud Function subscribes to process and enrich data
- Storage: BigQuery for historical data, Redis for real-time cache
- API: Cloud Run service (Node.js/Express) serving the frontend
- Frontend: Next.js dashboard with Server-Sent Events for live updates
The gotcha here is cost. BigQuery charges per query, so caching aggressively in Redis saved us probably $300/month. In practice, most users want the same data (current bull market leaders, top performers, etc.).
Ingesting Market Data with Cloud Functions
The first Cloud Function is dead simple. It runs on a schedule and publishes raw market data to Pub/Sub.
import { PubSub } from '@google-cloud/pubsub';
import axios from 'axios';
const pubsub = new PubSub();
const topic = pubsub.topic('market-data-raw');
interface StockQuote {
symbol: string;
price: number;
change: number;
changePercent: number;
timestamp: string;
}
export const fetchMarketData = async (req: any, res: any) => {
try {
const SP500_SYMBOLS = await getSP500Symbols(); // cached list
// Batch requests to avoid rate limits
const batches = chunkArray(SP500_SYMBOLS, 100);
for (const batch of batches) {
const promises = batch.map(symbol =>
fetchQuote(symbol).catch(err => {
console.error(`Failed to fetch ${symbol}:`, err.message);
return null;
})
);
const quotes = (await Promise.all(promises)).filter(Boolean) as StockQuote[];
// Publish to Pub/Sub
const messageId = await topic.publishMessage({
json: {
quotes,
timestamp: new Date().toISOString(),
batch: batches.indexOf(batch)
}
});
console.log(`Published batch ${batches.indexOf(batch)}, messageId: ${messageId}`);
}
res.status(200).send({ success: true, processed: SP500_SYMBOLS.length });
} catch (error) {
console.error('Ingestion error:', error);
res.status(500).send({ error: 'Ingestion failed' });
}
};
async function fetchQuote(symbol: string): Promise<StockQuote | null> {
const response = await axios.get(
`https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=${symbol}&apikey=${process.env.ALPHA_VANTAGE_KEY}`
);
const data = response.data['Global Quote'];
if (!data) return null;
return {
symbol,
price: parseFloat(data['05. price']),
change: parseFloat(data['09. change']),
changePercent: parseFloat(data['10. change percent'].replace('%', '')),
timestamp: new Date().toISOString()
};
}
function chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}The key insight here is batching. Alpha Vantage (and most market data APIs) have aggressive rate limits. We process 100 symbols at a time with a small delay between batches. Not elegant, but it works.
Processing and Enrichment Pipeline
The second Cloud Function subscribes to the Pub/Sub topic and does the heavy lifting: calculating bull/bear indicators, performance metrics, and writing to BigQuery.
import { BigQuery } from '@google-cloud/bigquery';
import Redis from 'ioredis';
const bigquery = new BigQuery();
const redis = new Redis(process.env.REDIS_URL);
interface EnrichedQuote extends StockQuote {
marketStatus: 'bull' | 'bear' | 'neutral';
performance30d: number;
performance90d: number;
volatility: number;
}
export const processMarketData = async (message: any, context: any) => {
const { quotes, timestamp } = message.json;
const enrichedQuotes: EnrichedQuote[] = await Promise.all(
quotes.map(async (quote: StockQuote) => {
// Fetch historical data from BigQuery for performance calculation
const historical = await getHistoricalData(quote.symbol, 90);
const performance30d = calculatePerformance(historical, 30);
const performance90d = calculatePerformance(historical, 90);
const volatility = calculateVolatility(historical);
// Determine market status
const marketStatus = determineMarketStatus(quote, historical);
return {
...quote,
marketStatus,
performance30d,
performance90d,
volatility
};
})
);
// Write to BigQuery for long-term storage
await insertIntoBigQuery(enrichedQuotes);
// Update Redis cache for real-time API
await updateRedisCache(enrichedQuotes);
console.log(`Processed ${enrichedQuotes.length} quotes at ${timestamp}`);
};
function determineMarketStatus(
quote: StockQuote,
historical: any[]
): 'bull' | 'bear' | 'neutral' {
// Bull market: price above 200-day MA and trending up
const ma200 = calculateMovingAverage(historical, 200);
const ma50 = calculateMovingAverage(historical, 50);
if (quote.price > ma200 && ma50 > ma200) {
return 'bull';
} else if (quote.price < ma200 && ma50 < ma200) {
return 'bear';
}
return 'neutral';
}
function calculatePerformance(historical: any[], days: number): number {
if (historical.length < days) return 0;
const current = historical[0].price;
const past = historical[days - 1].price;
return ((current - past) / past) * 100;
}
function calculateMovingAverage(data: any[], period: number): number {
if (data.length < period) return 0;
const sum = data.slice(0, period).reduce((acc, d) => acc + d.price, 0);
return sum / period;
}
function calculateVolatility(data: any[]): number {
// Standard deviation of returns
const returns = data.slice(0, -1).map((d, i) =>
(d.price - data[i + 1].price) / data[i + 1].price
);
const mean = returns.reduce((a, b) => a + b, 0) / returns.length;
const variance = returns.reduce((acc, r) => acc + Math.pow(r - mean, 2), 0) / returns.length;
return Math.sqrt(variance);
}Redis Caching Strategy for Performance
Without Redis, every API request would hit BigQuery. That gets expensive fast. We cache three types of data with different TTLs:
async function updateRedisCache(quotes: EnrichedQuote[]) {
const pipeline = redis.pipeline();
// 1. Individual stock data (TTL: 5 seconds)
quotes.forEach(quote => {
pipeline.setex(
`stock:${quote.symbol}`,
5,
JSON.stringify(quote)
);
});
// 2. Bull market leaders (sorted set, TTL: 10 seconds)
const bullStocks = quotes
.filter(q => q.marketStatus === 'bull')
.sort((a, b) => b.performance30d - a.performance30d)
.slice(0, 50);
pipeline.del('bull:leaders');
bullStocks.forEach((stock, idx) => {
pipeline.zadd('bull:leaders', idx, JSON.stringify(stock));
});
pipeline.expire('bull:leaders', 10);
// 3. Market overview stats (TTL: 30 seconds)
const stats = {
totalBull: quotes.filter(q => q.marketStatus === 'bull').length,
totalBear: quotes.filter(q => q.marketStatus === 'bear').length,
avgPerformance: quotes.reduce((sum, q) => sum + q.performance30d, 0) / quotes.length,
topPerformer: quotes.reduce((top, q) => q.performance30d > top.performance30d ? q : top),
timestamp: new Date().toISOString()
};
pipeline.setex('market:stats', 30, JSON.stringify(stats));
await pipeline.exec();
}This tiered caching strategy reduced our BigQuery costs by 85%. The tradeoff is slightly stale data, but in practice, 5-second delays are perfectly fine for this use case.
Building the API Layer with Cloud Run
The API is a simple Express app deployed on Cloud Run. It serves cached data from Redis and falls back to BigQuery for historical queries.
import express from 'express';
import Redis from 'ioredis';
import { BigQuery } from '@google-cloud/bigquery';
const app = express();
const redis = new Redis(process.env.REDIS_URL);
const bigquery = new BigQuery();
app.get('/api/market/overview', async (req, res) => {
const cached = await redis.get('market:stats');
if (cached) {
return res.json(JSON.parse(cached));
}
// Cache miss - query BigQuery
const stats = await queryMarketStats();
await redis.setex('market:stats', 30, JSON.stringify(stats));
res.json(stats);
});
app.get('/api/stocks/bull', async (req, res) => {
const leaders = await redis.zrange('bull:leaders', 0, -1);
if (leaders.length > 0) {
return res.json(leaders.map(l => JSON.parse(l)));
}
// Fallback to BigQuery
const bullStocks = await queryBullStocks();
res.json(bullStocks);
});
app.get('/api/stocks/:symbol/performance', async (req, res) => {
const { symbol } = req.params;
const { days = 30 } = req.query;
// This always hits BigQuery - too many permutations to cache
const query = `
SELECT
timestamp,
price,
marketStatus
FROM \`project.dataset.market_data\`
WHERE symbol = @symbol
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL @days DAY)
ORDER BY timestamp DESC
`;
const [rows] = await bigquery.query({
query,
params: { symbol, days: parseInt(days as string) }
});
res.json(rows);
});
const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
console.log(`API running on port ${PORT}`);
});Real-Time Frontend with Server-Sent Events
For the Next.js dashboard, we used Server-Sent Events instead of WebSockets. Simpler, and it works great for one-way data flow.
// app/api/stream/route.ts
import { NextRequest } from 'next/server';
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL!);
export async function GET(req: NextRequest) {
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const interval = setInterval(async () => {
try {
const stats = await redis.get('market:stats');
if (stats) {
const data = `data: ${stats}\n\n`;
controller.enqueue(encoder.encode(data));
}
} catch (error) {
console.error('Stream error:', error);
}
}, 2000);
req.signal.addEventListener('abort', () => {
clearInterval(interval);
controller.close();
});
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
}On the client side, it's just a useEffect hook:
// components/MarketDashboard.tsx
import { useEffect, useState } from 'react';
export default function MarketDashboard() {
const [stats, setStats] = useState(null);
useEffect(() => {
const eventSource = new EventSource('/api/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setStats(data);
};
return () => eventSource.close();
}, []);
if (!stats) return <div>Loading...</div>
return (
<div>
<h2>S&P 500 Market Performance</h2>
<p>Bull: {stats.totalBull} | Bear: {stats.totalBear}</p>
<p>Avg Performance: {stats.avgPerformance.toFixed(2)}%</p>
</div>
);
}Lessons Learned and Gotchas
Here's what I'd do differently next time:
1. Start with caching. I wasted a week optimizing BigQuery queries before realizing 90% of requests were identical. Redis should've been day one.
2. Pub/Sub is not free. We're paying about $50/month just for message throughput. For smaller projects, a simple cron job hitting an endpoint might be cheaper.
3. Market data APIs are expensive. Alpha Vantage's free tier is 5 requests/minute. We upgraded to a paid plan ($50/month) pretty quickly. Consider this in your budget.
4. BigQuery partitioning matters. We partition our table by date, which cut query costs by 60%. Without it, every query scans the entire table.
5. Error handling is critical. When a stock gets delisted or an API hiccups, your whole pipeline can stall. Always handle failures gracefully and log everything.
The real performance win came from aggressive caching, not clever algorithms. Cache everything, invalidate intelligently.
Wrapping Up
Building a real-time market data pipeline taught me that serverless isn't just marketing hype - it genuinely works well for event-driven workloads. The combination of Pub/Sub, Cloud Functions, and Redis gave us a system that handles millions of data points per day for under $200/month in infrastructure costs.
The key takeaways:
- Use Pub/Sub for decoupling data ingestion from processing
- Cache aggressively with Redis to avoid expensive database queries
- BigQuery is great for analytics, terrible for real-time lookups
- Server-Sent Events are simpler than WebSockets for one-way data streams
- Always batch API requests to avoid rate limits
This architecture has been running in production for 8 months now with zero downtime. The bull market performance tracking works smoothly, and the client is happy. That's a win in my book.
Related Articles
Vercel: My Go-To for Next.js Deployments (and More)
Vercel isn't just a hosting platform; it's a developer experience powerhouse. I'll walk you through why I use it for Next.js and other web projects, covering real-world scenarios, code examples, and gotchas.
AWS Lambda: Serverless Patterns & Anti-Patterns
Explore serverless architecture with AWS Lambda, focusing on practical patterns and common anti-patterns. Learn from a full-stack developer's experience building real-world applications.
Serverless Databases: DynamoDB vs. PlanetScale vs. Neon
A deep dive into serverless database solutions for JavaScript/TypeScript developers. We compare DynamoDB, PlanetScale, and Neon, covering use cases, code examples, and real-world trade-offs.
