Overview
Fetch live gold and silver quotes concurrently with Python’s async/await. We’ll query a public JSON endpoint that returns currency and metal rates and extract USD prices for XAU (gold) and XAG (silver). This is suitable for lightweight algotrading prototypes, monitoring, and backtesting warmups.
Key points:
- Concurrency with aiohttp to get XAUUSD and XAGUSD in parallel.
- Simple, dependency-light code.
- Notes on accuracy, retries, and performance for trading systems.
Data source: requests to a public endpoint that supports XAU and XAG as bases. No API key required.
Quickstart
Prerequisites
- Python 3.10+
- A terminal and pip
Install dependencies
- pip install aiohttp
Save the minimal script below to fetch both prices concurrently.
Run
- python fetch_metals.py
Extend
- Add retries, periodic scheduling, unit conversions, and persistence as needed.
Minimal working example
This script concurrently fetches USD prices for 1 troy ounce of gold (XAU) and silver (XAG) using exchangerate.host.
# file: fetch_metals.py
import asyncio
from datetime import datetime, timezone
import aiohttp
BASE_URL = "https://api.exchangerate.host/latest"
async def fetch_usd_per_ounce(session: aiohttp.ClientSession, base_code: str) -> dict:
"""Fetch USD price for 1 unit of `base_code` (XAU or XAG)."""
params = {"base": base_code, "symbols": "USD"}
async with session.get(BASE_URL, params=params, timeout=aiohttp.ClientTimeout(total=5)) as resp:
resp.raise_for_status()
data = await resp.json()
price = float(data["rates"]["USD"]) # USD per 1 unit of base_code
return {
"symbol": f"{base_code}USD",
"price": price,
"asof": data.get("date"), # ISO date string from source
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
async def main():
headers = {"User-Agent": "metals-fetcher/1.0"}
async with aiohttp.ClientSession(headers=headers) as session:
xau_task = asyncio.create_task(fetch_usd_per_ounce(session, "XAU"))
xag_task = asyncio.create_task(fetch_usd_per_ounce(session, "XAG"))
xau, xag = await asyncio.gather(xau_task, xag_task)
for quote in (xau, xag):
print(f"{quote['symbol']}: ${quote['price']:.2f} (asof={quote['asof']}, fetched={quote['fetched_at']})")
if __name__ == "__main__":
asyncio.run(main())
Expected output (values will vary):
- XAUUSD: $2345.67 (asof=2025-10-03, fetched=2025-10-03T12:34:56+00:00)
- XAGUSD: $29.45 (asof=2025-10-03, fetched=2025-10-03T12:34:56+00:00)
Steps explained
- Create a single aiohttp ClientSession to reuse the TCP connection.
- Launch two concurrent coroutines to fetch XAUUSD and XAGUSD.
- Parse the JSON and extract rates["USD"]; this is USD per ounce for the base (XAU or XAG).
- Print formatted results with timestamps for auditability.
Production additions: retries and interval polling
The snippet below adds exponential backoff, jitter, and a simple scheduler to poll every N seconds without overlapping runs.
# file: metals_polling.py
import asyncio
import random
from datetime import datetime, timezone
import aiohttp
BASE_URL = "https://api.exchangerate.host/latest"
async def fetch_with_retries(session, base_code: str, *, attempts: int = 3) -> dict:
params = {"base": base_code, "symbols": "USD"}
for i in range(attempts):
try:
async with session.get(BASE_URL, params=params, timeout=aiohttp.ClientTimeout(total=5)) as resp:
# Handle polite rate limiting
if resp.status in (429, 503):
raise aiohttp.ClientResponseError(resp.request_info, resp.history, status=resp.status)
resp.raise_for_status()
data = await resp.json()
price = float(data["rates"]["USD"])
return {
"symbol": f"{base_code}USD",
"price": price,
"asof": data.get("date"),
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
except (aiohttp.ClientError, asyncio.TimeoutError):
if i == attempts - 1:
raise
backoff = 0.4 * (2 ** i) + random.uniform(0, 0.2)
await asyncio.sleep(backoff)
async def poll_metals(interval_s: float = 15.0):
headers = {"User-Agent": "metals-poller/1.0"}
connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300)
async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
while True:
start = asyncio.get_event_loop().time()
xau, xag = await asyncio.gather(
fetch_with_retries(session, "XAU"),
fetch_with_retries(session, "XAG"),
return_exceptions=True,
)
for res in (xau, xag):
if isinstance(res, Exception):
print(f"warn: fetch failed: {res}")
else:
print(f"{res['symbol']}: ${res['price']:.2f} (asof={res['asof']}, fetched={res['fetched_at']})")
# Sleep the remainder to avoid overlapping
elapsed = asyncio.get_event_loop().time() - start
await asyncio.sleep(max(0.0, interval_s - elapsed))
if __name__ == "__main__":
asyncio.run(poll_metals(15))
Interpreting the prices
- XAUUSD: USD per 1 troy ounce of gold.
- XAGUSD: USD per 1 troy ounce of silver.
- The response date is the provider’s reference date; fetched_at is your local UTC timestamp.
Pitfalls to avoid
- Precision for trading: float can introduce rounding errors. Use Decimal for order sizing and PnL. Convert parsed floats to Decimal at the boundary.
- Latency and freshness: “latest” endpoints can be delayed by up to minutes. For latency-sensitive execution, use a low-latency paid feed and colocated infra.
- Rate limiting: Handle HTTP 429/503 with backoff. Add a per-host concurrency cap.
- Timeouts: Always set client timeouts; default infinite waits can stall your loop.
- Units: XAU/XAG are troy ounces, not grams. Convert correctly when needed.
- Clock drift: Ensure your system clock is synced (e.g., NTP) so timestamps align.
- Error handling: Guard against missing keys or unexpected schemas; log and alert rather than silently failing.
Performance notes
- Session reuse: Keep one ClientSession for the app lifetime to benefit from TCP/TLS reuse.
- Connection pooling: Configure TCPConnector(limit=...) to bound concurrency and avoid socket exhaustion.
- Batching: Here, bases differ (XAU vs XAG), so two requests are necessary. If your provider supports multi-symbol for the same base, prefer one batch call.
- Backoff strategy: Add jitter to avoid thundering herds. Use exponential backoff with caps.
- Event loop: Avoid blocking calls (I/O, CPU). For heavy CPU work (e.g., signal calc), offload with asyncio.to_thread or a process pool.
- Metrics: Track latency, error rates, and data freshness; trigger alerts on anomalies.
Tiny FAQ
Is this truly “real-time”?
- It’s near-real-time and can be delayed. For execution-grade trading, use a dedicated market data provider.
Can I use httpx instead of aiohttp?
- Yes. httpx provides an async client with a similar interface. Keep the same concurrency patterns and timeouts.
How often should I poll?
- Match provider limits and your strategy needs. 5–30 seconds is typical for monitoring; streaming/tick feeds are preferred for execution.
How do I integrate with my trading bot?
- Wrap the fetcher in a service that publishes quotes to your event bus or cache, then have strategy and execution actors consume from there.