KhueApps
Home/Python/Fetch Exchange Rates in Python with asyncio and aiohttp

Fetch Exchange Rates in Python with asyncio and aiohttp

Last updated: October 03, 2025

Overview

For algorithmic trading, fetching FX rates concurrently reduces latency and keeps signals timely. This guide shows how to use Python’s async/await with aiohttp to fetch rates efficiently, batch requests, and add production-ready features like retries and rate limiting.

Quickstart

  • What you’ll build: an async client that fetches exchange rates concurrently.
  • Tech: Python 3.10+, asyncio, aiohttp.
pip install aiohttp

Minimal Working Example (self-contained)

This script spins up a tiny local FX endpoint and an async client that queries it concurrently. No external services required.

import asyncio
import time
import aiohttp
from aiohttp import web

# --- Minimal local FX server (for demo/testing) ---

def create_app():
    base_rates = {  # synthetic mid rates relative to USD
        "USD": 1.0,
        "EUR": 0.9,
        "JPY": 150.0,
        "GBP": 0.78,
    }

    async def rates_handler(request: web.Request):
        base = request.query.get("base", "USD").upper()
        symbols_q = request.query.get("symbols", "").upper()
        symbols = [s for s in symbols_q.split(",") if s]
        if base not in base_rates:
            return web.json_response({"error": "unknown base"}, status=400)
        if not symbols:
            symbols = list(base_rates.keys())
        rates = {}
        for sym in symbols:
            if sym not in base_rates:
                return web.json_response({"error": f"unknown symbol {sym}"}, status=400)
            rate = base_rates[sym] / base_rates[base]
            rates[sym] = round(rate, 6)
        return web.json_response({
            "base": base,
            "rates": rates,
            "ts": int(time.time()),
        })

    app = web.Application()
    app.router.add_get("/latest", rates_handler)
    return app

class LocalRateServer:
    def __init__(self, host="127.0.0.1", port=8081):
        self.host = host
        self.port = port
        self.runner = None

    async def __aenter__(self):
        app = create_app()
        self.runner = web.AppRunner(app)
        await self.runner.setup()
        site = web.TCPSite(self.runner, self.host, self.port)
        await site.start()
        return f"http://{self.host}:{self.port}"

    async def __aexit__(self, exc_type, exc, tb):
        if self.runner:
            await self.runner.cleanup()

# --- Async client ---

async def fetch_rates(session: aiohttp.ClientSession, base_url: str, base: str, symbols: list[str]) -> dict:
    params = {"base": base.upper(), "symbols": ",".join(s.upper() for s in symbols)}
    timeout = aiohttp.ClientTimeout(total=5)
    async with session.get(f"{base_url}/latest", params=params, timeout=timeout) as resp:
        resp.raise_for_status()
        data = await resp.json()
        return data["rates"]

async def main():
    async with LocalRateServer() as base_url:
        async with aiohttp.ClientSession() as session:
            tasks = [
                fetch_rates(session, base_url, "USD", ["EUR", "JPY", "GBP"]),
                fetch_rates(session, base_url, "EUR", ["USD", "JPY"]),
            ]
            usd_rates, eur_rates = await asyncio.gather(*tasks)
            print("USD base:", usd_rates)
            print("EUR base:", eur_rates)

if __name__ == "__main__":
    asyncio.run(main())

What it demonstrates:

  • Single shared ClientSession (connection pooling).
  • Concurrent requests with asyncio.gather.
  • Query parameters for base and symbol list.

Step-by-step

  1. Install aiohttp and choose Python 3.10+ for typing and asyncio improvements.
  2. Reuse a single aiohttp.ClientSession per process (or per task group) to avoid reconnect overhead.
  3. Add a total timeout per request; handle exceptions (ClientConnectorError, asyncio.TimeoutError).
  4. Batch requests with asyncio.gather to minimize wall-clock time.
  5. Normalize output into a standard dict: {(base, quote): rate} for ease of use in trading logic.
  6. Add retries with exponential backoff for transient issues.
  7. Limit concurrency with a Semaphore to respect vendor rate limits.

Adding retries and backoff (production pattern)

import random
from aiohttp import ClientError

async def get_json_with_retries(session, url, params, *, attempts=3, base_delay=0.25):
    for i in range(attempts):
        try:
            async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=5)) as resp:
                resp.raise_for_status()
                return await resp.json()
        except (ClientError, asyncio.TimeoutError) as e:
            if i == attempts - 1:
                raise
            # jittered exponential backoff
            await asyncio.sleep((2 ** i) * base_delay * (1 + random.random() * 0.25))

Limiting concurrency

sem = asyncio.Semaphore(10)  # tune per provider limits and latency

async def limited_fetch(session, base_url, base, symbols):
    async with sem:
        return await fetch_rates(session, base_url, base, symbols)

Integrating into a trading system

  • Normalize pairs: store as tuples (base, quote) in uppercase.
  • Attach timestamps to every rate; reject stale data beyond your SLA.
  • Convert rates to Decimal for PnL-sensitive math; keep floats for speed in non-critical paths.
  • Cache hot bases (e.g., USD) with a short TTL (e.g., 1–5 seconds) to reduce calls.

Example normalization snippet:

from decimal import Decimal

def normalize_rates(base: str, rates: dict[str, float]) -> dict[tuple[str, str], Decimal]:
    out = {}
    for quote, v in rates.items():
        out[(base.upper(), quote.upper())] = Decimal(str(v))
    return out

Pitfalls to avoid

  • Blocking the event loop: do not call time.sleep or heavy CPU work; use asyncio.sleep or move CPU-bound tasks to a ProcessPool.
  • No timeouts: always set client timeouts to avoid hanging tasks.
  • Unbounded concurrency: hammering the API triggers bans; use semaphores and request pacing.
  • Ignoring currency case: normalize to uppercase; reject unknown symbols.
  • Floating-point surprises: prefer Decimal for accounting; keep conversions at the boundaries.
  • Mixed timestamps: ensure all times are UTC and comparable; discard stale snapshots.

Performance notes

  • Connection reuse: one ClientSession per service; reuse across tasks.
  • Batch by base: requesting multiple symbols for the same base is cheaper than per-pair calls.
  • Tune concurrency: start low (5–10), measure p95 latency, then adjust.
  • Jitter backoff: reduces herd effects during provider incidents.
  • Caching tiers: in-memory TTL cache first; optional shared cache (Redis) for multi-process bots.
  • Parsing cost: JSON is fine for small batches; for very large symbol sets, prefer compact responses and avoid repeated parsing.

Testing and determinism

  • Use the included local server for unit tests; seed synthetic rates for deterministic scenarios.
  • Mock time and the network layer to test retry and timeout logic.
  • Validate cross rates: r(A->B) ≈ 1 / r(B->A) within tolerance; alert on violations.

Small FAQ

  • How many concurrent requests should I run? Start with 5–10 and measure error rates and latency. Obey your vendor’s published limits.

  • Should I use WebSockets instead? For tick-by-tick streaming or frequent updates, yes. Use HTTP polling for occasional snapshots.

  • Float or Decimal for rates? Use Decimal for PnL and order sizing; float is acceptable for lightweight signal logic.

  • How do I detect stale data? Compare server timestamps to UTC now and set a maximum age threshold per strategy.

  • Can I share a ClientSession across tasks? Yes. Create it once and pass it down; close it at shutdown with an async context manager.

Next Article: Generating Real-Time Trading Signals with yfinance and Python

Series: Algorithmic Trading with Python

Python