Overview
For algorithmic trading, fetching FX rates concurrently reduces latency and keeps signals timely. This guide shows how to use Python’s async/await with aiohttp to fetch rates efficiently, batch requests, and add production-ready features like retries and rate limiting.
Quickstart
- What you’ll build: an async client that fetches exchange rates concurrently.
- Tech: Python 3.10+, asyncio, aiohttp.
pip install aiohttp
Minimal Working Example (self-contained)
This script spins up a tiny local FX endpoint and an async client that queries it concurrently. No external services required.
import asyncio
import time
import aiohttp
from aiohttp import web
# --- Minimal local FX server (for demo/testing) ---
def create_app():
base_rates = { # synthetic mid rates relative to USD
"USD": 1.0,
"EUR": 0.9,
"JPY": 150.0,
"GBP": 0.78,
}
async def rates_handler(request: web.Request):
base = request.query.get("base", "USD").upper()
symbols_q = request.query.get("symbols", "").upper()
symbols = [s for s in symbols_q.split(",") if s]
if base not in base_rates:
return web.json_response({"error": "unknown base"}, status=400)
if not symbols:
symbols = list(base_rates.keys())
rates = {}
for sym in symbols:
if sym not in base_rates:
return web.json_response({"error": f"unknown symbol {sym}"}, status=400)
rate = base_rates[sym] / base_rates[base]
rates[sym] = round(rate, 6)
return web.json_response({
"base": base,
"rates": rates,
"ts": int(time.time()),
})
app = web.Application()
app.router.add_get("/latest", rates_handler)
return app
class LocalRateServer:
def __init__(self, host="127.0.0.1", port=8081):
self.host = host
self.port = port
self.runner = None
async def __aenter__(self):
app = create_app()
self.runner = web.AppRunner(app)
await self.runner.setup()
site = web.TCPSite(self.runner, self.host, self.port)
await site.start()
return f"http://{self.host}:{self.port}"
async def __aexit__(self, exc_type, exc, tb):
if self.runner:
await self.runner.cleanup()
# --- Async client ---
async def fetch_rates(session: aiohttp.ClientSession, base_url: str, base: str, symbols: list[str]) -> dict:
params = {"base": base.upper(), "symbols": ",".join(s.upper() for s in symbols)}
timeout = aiohttp.ClientTimeout(total=5)
async with session.get(f"{base_url}/latest", params=params, timeout=timeout) as resp:
resp.raise_for_status()
data = await resp.json()
return data["rates"]
async def main():
async with LocalRateServer() as base_url:
async with aiohttp.ClientSession() as session:
tasks = [
fetch_rates(session, base_url, "USD", ["EUR", "JPY", "GBP"]),
fetch_rates(session, base_url, "EUR", ["USD", "JPY"]),
]
usd_rates, eur_rates = await asyncio.gather(*tasks)
print("USD base:", usd_rates)
print("EUR base:", eur_rates)
if __name__ == "__main__":
asyncio.run(main())
What it demonstrates:
- Single shared ClientSession (connection pooling).
- Concurrent requests with asyncio.gather.
- Query parameters for base and symbol list.
Step-by-step
- Install aiohttp and choose Python 3.10+ for typing and asyncio improvements.
- Reuse a single aiohttp.ClientSession per process (or per task group) to avoid reconnect overhead.
- Add a total timeout per request; handle exceptions (ClientConnectorError, asyncio.TimeoutError).
- Batch requests with asyncio.gather to minimize wall-clock time.
- Normalize output into a standard dict: {(base, quote): rate} for ease of use in trading logic.
- Add retries with exponential backoff for transient issues.
- Limit concurrency with a Semaphore to respect vendor rate limits.
Adding retries and backoff (production pattern)
import random
from aiohttp import ClientError
async def get_json_with_retries(session, url, params, *, attempts=3, base_delay=0.25):
for i in range(attempts):
try:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=5)) as resp:
resp.raise_for_status()
return await resp.json()
except (ClientError, asyncio.TimeoutError) as e:
if i == attempts - 1:
raise
# jittered exponential backoff
await asyncio.sleep((2 ** i) * base_delay * (1 + random.random() * 0.25))
Limiting concurrency
sem = asyncio.Semaphore(10) # tune per provider limits and latency
async def limited_fetch(session, base_url, base, symbols):
async with sem:
return await fetch_rates(session, base_url, base, symbols)
Integrating into a trading system
- Normalize pairs: store as tuples (base, quote) in uppercase.
- Attach timestamps to every rate; reject stale data beyond your SLA.
- Convert rates to Decimal for PnL-sensitive math; keep floats for speed in non-critical paths.
- Cache hot bases (e.g., USD) with a short TTL (e.g., 1–5 seconds) to reduce calls.
Example normalization snippet:
from decimal import Decimal
def normalize_rates(base: str, rates: dict[str, float]) -> dict[tuple[str, str], Decimal]:
out = {}
for quote, v in rates.items():
out[(base.upper(), quote.upper())] = Decimal(str(v))
return out
Pitfalls to avoid
- Blocking the event loop: do not call time.sleep or heavy CPU work; use asyncio.sleep or move CPU-bound tasks to a ProcessPool.
- No timeouts: always set client timeouts to avoid hanging tasks.
- Unbounded concurrency: hammering the API triggers bans; use semaphores and request pacing.
- Ignoring currency case: normalize to uppercase; reject unknown symbols.
- Floating-point surprises: prefer Decimal for accounting; keep conversions at the boundaries.
- Mixed timestamps: ensure all times are UTC and comparable; discard stale snapshots.
Performance notes
- Connection reuse: one ClientSession per service; reuse across tasks.
- Batch by base: requesting multiple symbols for the same base is cheaper than per-pair calls.
- Tune concurrency: start low (5–10), measure p95 latency, then adjust.
- Jitter backoff: reduces herd effects during provider incidents.
- Caching tiers: in-memory TTL cache first; optional shared cache (Redis) for multi-process bots.
- Parsing cost: JSON is fine for small batches; for very large symbol sets, prefer compact responses and avoid repeated parsing.
Testing and determinism
- Use the included local server for unit tests; seed synthetic rates for deterministic scenarios.
- Mock time and the network layer to test retry and timeout logic.
- Validate cross rates: r(A->B) ≈ 1 / r(B->A) within tolerance; alert on violations.
Small FAQ
How many concurrent requests should I run? Start with 5–10 and measure error rates and latency. Obey your vendor’s published limits.
Should I use WebSockets instead? For tick-by-tick streaming or frequent updates, yes. Use HTTP polling for occasional snapshots.
Float or Decimal for rates? Use Decimal for PnL and order sizing; float is acceptable for lightweight signal logic.
How do I detect stale data? Compare server timestamps to UTC now and set a maximum age threshold per strategy.
Can I share a ClientSession across tasks? Yes. Create it once and pass it down; close it at shutdown with an async context manager.