API Reference
Complete documentation for the PaperBrokerClient Python API — FIX 4.4 trading plus live alpha framework.
Configuration
Configure the client using constructor parameters. QuickFIX config files are auto-generated.
FIX Connection
| Parameter | Type | Description | Default |
|---|---|---|---|
| socket_connect_host | str | FIX server hostname or IP | "localhost" |
| socket_connect_port | int | FIX server port | 5001 |
| sender_comp_id | str | Your FIX client identifier | "CLIENT" |
| target_comp_id | str | Server FIX identifier | "SERVER" |
Authentication
| Parameter | Type | Description | Required |
|---|---|---|---|
| username | str | Trading account username | Required |
| password | str | Trading account password | Required |
| default_sub_account | str | Default sub-account ID for orders | Required |
REST API & Logging
| Parameter | Type | Description | Default |
|---|---|---|---|
| rest_base_url | str | Base URL for REST API endpoints | "http://localhost:9090" |
| log_dir | str | Directory for log files | "logs" |
| console | bool | Output logs to console | False |
| order_store_path | str | None | Path to SQLite file for order persistence (v0.2.4+). Set None for memory-only mode. Use a distinct path per running process. |
"orders.db" |
| enable_fix | bool | None | FIX mode toggle (v0.2.5+). None auto-detects QuickFIX availability. False = REST-only mode (macOS arm64 / no QuickFIX); order methods raise FIXNotEnabledError. |
None |
| cfg_path | str | None | Path to a custom QuickFIX .cfg. If set, disables auto-generation. |
None |
| auto_generate_cfg | bool | Auto-create QuickFIX config file from constructor kwargs. | True |
| save_cfg_to | str | None | If set, write the generated config to this path (debugging aid). | None |
Client attributes
| Attribute | Type | Notes |
|---|---|---|
| client.orders | OrderManager | None | Direct order facade (FIX mode only). None when enable_fix=False. |
| client.accounting | AccountClient | REST account client (always present). |
| client.is_fix_enabled | bool | Property — reflects auto-detect outcome. |
| client.session | FIXEngine | None | Back-compat alias for the FIX engine driving client.orders. |
Security: Always use environment variables for credentials. Never hardcode passwords. Add .env to .gitignore.
Environment Variables
# .env file
# --- FIX client (validated by SignalDrivenAlpha.from_paper) ---
PAPER_USERNAME=your-username
PAPER_PASSWORD=your-password
PAPER_REST_BASE_URL=http://localhost:9090
SOCKET_HOST=your-fix-server
SOCKET_PORT=5001
SENDER_COMP_ID=your-sender-id
TARGET_COMP_ID=SERVER
# --- Market data: Redis (default backend) ---
MARKET_REDIS_HOST=your-redis-host
MARKET_REDIS_PORT=6379
MARKET_REDIS_PASSWORD=your-redis-password # optional
# --- Market data: Kafka (recommended for multi-instrument alphas) ---
PAPERBROKER_KAFKA_BOOTSTRAP_SERVERS=host:port
PAPERBROKER_KAFKA_USERNAME=sasl-user
PAPERBROKER_KAFKA_PASSWORD=sasl-pass
PAPERBROKER_ENV_ID=real
# --- Convention (read by examples, not validated) ---
PAPER_ACCOUNT_ID=main
PAPER_ACCOUNT_ID_D2=V001 # example 03 only
VN30F1M=HNXDS:VN30F2606 # examples 09-14
VN30F2M=HNXDS:VN30F2609 # example 12 pair-spread
EQUITY=HSX:MWG # example 07
Connection
Lifecycle methods for managing FIX session connection.
Start FIX session and REST authentication. Non-blocking.
Stop FIX session and close all connections gracefully.
Block until FIX session is established or timeout expires.
True if logged on, False if timeout
Check if FIX session is currently active.
Get the last authentication error message if login failed.
Last server-side logout reason captured from Logout(58) — symmetric with last_logon_error(). Useful for "why did my session drop" diagnostics.
Usage
client.connect()
if client.wait_until_logged_on(timeout=10):
print("✅ Connected!")
# ... trading logic ...
client.disconnect()
else:
print(f"❌ Failed: {client.last_logon_error()}")
Order Management
Methods for placing, canceling, and tracking orders via FIX protocol.
Submit a new order to the exchange.
cl_ord_id — Client order ID for tracking
| Parameter | Type | Description |
|---|---|---|
| full_symbol | str | Instrument symbol (e.g., "HNXDS:VN30F2511") |
| side | str | "BUY" or "SELL" |
| qty | int | Order quantity |
| price | float | Limit price |
| ord_type | str | "LIMIT", "MARKET", etc. |
| tif | str | Time in force. Default "DAY" (v0.2.7+, was "GTC"). See TIF table below. |
Time-In-Force values v0.2.7
| String | FIX tag 59 | Notes |
|---|---|---|
"DAY" |
TimeInForce_DAY |
Default (v0.2.7+). Only TIF accepted by Vietnam venues. |
"GTC" |
TimeInForce_GOOD_TILL_CANCEL |
Rejected by Vietnam paper as GTC_UNSUPPORTED_DAY_ONLY. Accepted on venues that support it. |
"IOC" |
TimeInForce_IMMEDIATE_OR_CANCEL |
Unchanged. |
"FOK" |
TimeInForce_FILL_OR_KILL |
Newly mapped in v0.2.7 (silently fell through to IOC in v0.2.6). |
Unknown tif= strings raise ValueError (previously routed to IOC). The pre-v0.2.7 "anything-not-GTC → IOC" behavior no longer applies — pass "IOC" explicitly.
Cancel an existing order and wait for server confirmation. Returns (is_terminal, status); if is_terminal=False the cancel did not confirm within the timeout (may need manual retry).
Legacy fire-and-forget shim. Emits DeprecationWarning; removed in v0.3.0. Listen to fix:order:canceled for confirmation.
Get current status string of an order (e.g., "NEW", "FILLED").
Block until the order reaches one of the given statuses, or timeout. Pass-through to OrderManager.
True if status is terminal (Filled / Canceled / Rejected / Expired / DoneForDay).
Drop in-memory state for a settled order — useful in long-running processes that accumulate terminal orders.
Cumulative filled quantity.
Remaining unfilled quantity (None if unknown).
Last Text(58) field — reject reason or other free-text server feedback.
Load active orders from SQLite and rehydrate in-memory state. Call after wait_until_logged_on() to recover orders from a previous crashed session. Returns list of order dicts for orders not in terminal state (Filled, Canceled, Rejected, Expired).
[{"cl_ord_id", "order_id", "symbol", "side", "qty", "price", "status", ...}]
Usage
# Place order (v0.2.7: tif defaults to "DAY")
cl_ord_id = client.place_order(
full_symbol="HNXDS:VN30F2606",
side="BUY",
qty=1,
price=1200.0,
ord_type="LIMIT", # default
# tif="DAY" # default; only TIF Vietnam venues accept
)
print(f"Order placed: {cl_ord_id}")
# Check status (or wait for a specific status)
status = client.get_order_status(cl_ord_id)
client.wait_for(cl_ord_id, statuses={"NEW", "REJECTED"}, timeout=5.0)
# Cancel order — v0.2.6+ returns (is_terminal, status)
is_terminal, final_status = client.cancel_order(cl_ord_id, timeout=5.0)
if is_terminal:
print(f"Cancel confirmed: {final_status}")
else:
print(f"Still pending: {final_status}")
# Cleanup terminal orders in long-running processes
if client.is_order_done(cl_ord_id):
client.cleanup_order(cl_ord_id)
Order Status Values
| Status | Description |
|---|---|
NEW |
Order accepted by exchange |
PARTIALLY_FILLED |
Order partially executed |
FILLED |
Order fully executed |
CANCELED |
Order canceled |
REJECTED |
Order rejected by exchange |
PENDING_NEW |
Order pending acceptance |
PENDING_CANCEL |
Cancel request pending |
Account Queries
REST API methods for querying account data, portfolio, and transactions.
Query available cash balance.
{"remainCash": float, "totalCash": float, ...}
Get portfolio positions with P&L for a sub-account.
Expected Output:
# Returns dict with success flag and items list
{
"success": True,
"items": [
{
"instrument": "HNXDS:VN30F2511", # Symbol
"quantity": 2, # Position size
"totalCost": 3900.0, # Total cost basis
"currentPrice": 1960.0, # Current market price
"marketValue": 3920.0, # quantity * currentPrice
"pnl": 20.0 # Unrealized P&L
}
]
}
Get orders within a date range.
Expected Output:
# Returns dict with success flag and items list
{
"success": True,
"items": [
{
"orderId": "2503150001", # Exchange order ID
"clOrdId": "abc123...", # Client order ID
"symbol": "HNXDS:VN30F2511", # Instrument
"side": "1", # "1"=BUY, "2"=SELL
"orderQty": 5, # Requested quantity
"cumQty": 3, # Filled quantity
"leavesQty": 2, # Remaining quantity
"price": 1950.0, # Limit price
"avgPx": 1948.5, # Average fill price
"ordStatus": "PARTIALLY_FILLED" # Order status
}
]
}
Get transaction history within a date range.
Calculate maximum order quantity based on buying power.
Expected Output:
{
"maxQty": 25, # Maximum placeable quantity
"perUnitCost": 1960.0, # Cost per unit (margin/price)
"remainCash": 50000.0, # Available cash/margin
"unlimited": False # True if no quantity limit
}
Sub-Account Switching
Context manager for temporary sub-account switching. Thread-safe.
# Query account data
portfolio = client.get_portfolio_by_sub("D1")
print(f"Positions: {len(portfolio)}")
for pos in portfolio:
print(f" {pos['instrument']}: {pos['quantity']} @ {pos['currentPrice']}")
print(f" P&L: {pos['pnl']}")
# Get max order size
max_info = client.get_max_placeable("HNXDS:VN30F2511", 1950.0, "BUY")
print(f"Max BUY: {max_info['maxQty']} contracts")
# Switch accounts temporarily
with client.use_sub_account("D1"):
client.place_order("HNXDS:VN30F2511", "SELL", 1, 1950.0)
with client.use_sub_account("D2"):
client.place_order("HNXDS:VN30F2511", "BUY", 1, 1950.0)
# Restores to default after context exits
Events
Subscribe to real-time notifications without polling. Event-driven architecture for instant updates.
Subscribe to an event. Handler called synchronously from emitter thread.
Unsubscribe from an event.
Session Events
| Event | Payload | Description |
|---|---|---|
fix:logon |
session_id |
FIX session established |
fix:logout |
session_id, reason |
FIX session disconnected |
fix:logon_error |
error, session_id |
FIX authentication failed |
fix:reject |
reason, msg_type |
FIX message rejected by server |
Order Events
| Event | Payload | Description |
|---|---|---|
fix:order:accepted |
cl_ord_id, status, exec_type, order_id |
Exchange acks NEW. |
fix:order:partial_fill |
cl_ord_id, status, last_px, last_qty, cum_qty, avg_px |
Non-terminal fill (cum_qty < ordered). Same payload as fix:order:filled. |
fix:order:filled |
cl_ord_id, status, last_px, last_qty, cum_qty, avg_px |
Terminal fill (cum_qty == ordered). |
fix:order:canceled |
orig_cl_ord_id, status, cum_qty |
Order cancellation confirmed. |
fix:order:rejected |
cl_ord_id, reason, status |
Order rejected by exchange. |
SignalDrivenAlpha subscribes to all 5 order-lifecycle events
(accepted / partial_fill / filled / canceled /
rejected) internally and maintains OrderState / SignalState for you.
Read state via ctx.open_orders[sym] and ctx.signals[parent_id] — no need to
wire these events yourself.
Account & system
| Event | Payload | Description |
|---|---|---|
account:switch |
old, new, scope |
use_sub_account() entering or exiting. |
event:handler_error |
original_event, handler_name, error_type, error_message, original_payload |
A handler raised. The bus catches, logs, and re-emits — never crashes. |
Usage
# Session events
def on_logon(session_id, **kw):
print(f"✅ Connected: {session_id}")
def on_logout(session_id, reason=None, **kw):
print(f"Disconnected: {reason or 'Normal logout'}")
def on_error(error, **kw):
print(f"❌ Auth failed: {error}")
client.on("fix:logon", on_logon)
client.on("fix:logout", on_logout)
client.on("fix:logon_error", on_error)
# Order events
def on_accepted(cl_ord_id, status, **kw):
print(f"Order {cl_ord_id[:8]}... accepted: {status}")
def on_filled(cl_ord_id, last_px, last_qty, cum_qty=None, **kw):
print(f"Filled: {last_qty} @ {last_px}")
if cum_qty:
print(f" Total: {cum_qty}")
def on_rejected(cl_ord_id, reason, **kw):
print(f"❌ Rejected: {reason}")
client.on("fix:order:accepted", on_accepted)
client.on("fix:order:filled", on_filled)
client.on("fix:order:rejected", on_rejected)
Best Practice: Always use **kw in handlers to accept future payload additions. Handlers are called synchronously - keep them fast!
Market Data
Real-time market data via Redis or Kafka. Both expose the same async subscribe(instrument, callback) API and emit the same QuoteSnapshot shape.
RedisMarketDataClient
Lower latency, simpler setup. Channel format {exchange}:{symbol}. Supports one-shot query().
| Parameter | Type | Description |
|---|---|---|
| host | str | Redis server hostname/IP |
| port | int | Redis server port (default: 6379) |
| password | str | Redis password (optional) |
| merge_updates | bool | True for full snapshots, False for deltas only |
Direct GET for current quote snapshot. Use for one-time price lookups.
Subscribe to real-time updates via Redis pub/sub.
Close Redis connections and cleanup resources.
from paperbroker.market_data import RedisMarketDataClient
client = RedisMarketDataClient(
host=os.getenv("MARKET_REDIS_HOST"),
port=int(os.getenv("MARKET_REDIS_PORT", 6379)),
password=os.getenv("MARKET_REDIS_PASSWORD"),
merge_updates=True # Full snapshots
)
# Query mode - one-time lookup
quote = await client.query("HNXDS:VN30F2511")
if quote:
print(f"Price: {quote.latest_matched_price}")
# Subscribe mode - real-time updates
def on_quote(instrument, quote):
print(f"{instrument}: {quote.latest_matched_price}")
await client.subscribe("HNXDS:VN30F2511", on_quote)
KafkaMarketDataClient
For production. Higher throughput, message durability, replay capability.
| Parameter | Type | Description |
|---|---|---|
| bootstrap_servers | str | Kafka bootstrap servers (e.g., "kafka-host:9092") |
| username | str | SASL username for Kafka authentication |
| password | str | SASL password for Kafka authentication |
| env_id | str | Environment ID for topic prefix (e.g., "real", "test") |
| merge_updates | bool | True for full snapshots, False for deltas |
Topic Format: {env_id}.{exchange}.{symbol}
Example: real.HNXDS.VN30F2602 for instrument HNXDS:VN30F2602
Subscribe to real-time quote updates for an instrument.
Start the Kafka consumer. Must be called after subscriptions.
Stop the Kafka consumer and close connections.
Get cached quote snapshot for a subscribed instrument.
Environment Variables
# .env file - Kafka connection
PAPERBROKER_KAFKA_BOOTSTRAP_SERVERS=your-kafka-host:9092
PAPERBROKER_KAFKA_USERNAME=your-username
PAPERBROKER_KAFKA_PASSWORD=your-password
PAPERBROKER_ENV_ID=real
Usage
import asyncio
import os
from dotenv import load_dotenv
from paperbroker.market_data import KafkaMarketDataClient
load_dotenv()
async def main():
# Create Kafka client
client = KafkaMarketDataClient(
bootstrap_servers=os.getenv("PAPERBROKER_KAFKA_BOOTSTRAP_SERVERS"),
username=os.getenv("PAPERBROKER_KAFKA_USERNAME"),
password=os.getenv("PAPERBROKER_KAFKA_PASSWORD"),
env_id=os.getenv("PAPERBROKER_ENV_ID"),
merge_updates=True # Full snapshots
)
# Quote callback
def on_quote(instrument, quote):
print(f"{instrument}: {quote.latest_matched_price}")
print(f" Bid: {quote.bid_price_1} x {quote.bid_quantity_1}")
print(f" Ask: {quote.ask_price_1} x {quote.ask_quantity_1}")
# Subscribe and start
await client.subscribe("HNXDS:VN30F2602", on_quote)
await client.start()
try:
while True:
await asyncio.sleep(1)
finally:
await client.stop()
asyncio.run(main())
QuoteSnapshot Fields
| Field | Type | Description |
|---|---|---|
| latest_matched_price | float | Last traded price |
| latest_matched_quantity | float | Last traded quantity |
| bid_price_1/2/3 | float | Best bid prices (top 3 levels) |
| bid_quantity_1/2/3 | float | Bid quantities (top 3 levels) |
| ask_price_1/2/3 | float | Best ask prices (top 3 levels) |
| ask_quantity_1/2/3 | float | Ask quantities (top 3 levels) |
| ref_price | float | Reference price |
| spread | float | Bid-ask spread |
| total_matched_quantity | float | Total volume traded |
Alpha Framework v0.2.7
Sealed-lifecycle base class for live algorithmic strategies. Subclass SignalDrivenAlpha, override 4–5 hooks; the framework handles bar aggregation, multi-instrument joint triggers, order placement, fill tracking, state persistence, and clean shutdown.
SignalDrivenAlpha — hook chain
| Hook | Required | Default behavior |
|---|---|---|
get_indicators(ctx) -> Any |
Required | — |
get_signals(indicators, ctx) -> list[Signal] |
Required | — |
get_entry_price(signal, ctx) -> float | None |
Required | None ⇒ MARKET (entry signals). |
get_quantity(signal, ctx) -> int |
Required | — (entry signals). |
plan_orders(signal, ctx) -> list[OrderRequest] |
Optional | Type-dispatched: entry uses get_entry_price + get_quantity; CloseSignal auto-sizes to abs(position) + MARKET. |
AlphaConfig fields
| Field | Type | Notes |
|---|---|---|
| instruments | list[str] | Required, non-empty, no duplicates, EXCHANGE:CODE shape. |
| sub_account | str | Required, non-empty. |
| timeframe | str | e.g. "1m", "5m", "1h". |
| qty | int | dict[str, int] | Strict positive int. Dict form is per-symbol; must cover every entry in instruments. |
| params | dict | Strategy parameters. Pair with declared_params (class attr) to catch typos at __init__. |
| state_path | str | None | If set, enables StateStore persistence. Accessing self.state_store raises if unset. |
Convenience: AlphaConfig.single(symbol=..., ...) builds a single-instrument config. config.qty_for(symbol) -> int reads the per-symbol qty.
AlphaContext — per-trigger state
| Attribute | Type | Notes |
|---|---|---|
| bars | dict[str, Deque[Bar]] | Per-symbol bar history. |
| quotes | dict[str, QuoteSnapshot] | Latest L1/L2 per symbol. |
| positions | dict[str, Position] | Signed qty + VWAP from fills. |
| open_orders | dict[str, list[OrderState]] | In-flight orders per symbol (rich state). |
| signals | dict[str, SignalState] | parent_signal_id → aggregated leg state. |
| account | AccountState | Cached cash / equity / margin (1s TTL). |
| now | datetime | Trigger timestamp (tz-aware). |
| triggered_by | list[str] | Symbols whose bar / quote fired this trigger. |
Signal & subtypes
| Type | Purpose |
|---|---|
Signal(symbol, side, tag, metadata) | The "idea". side: "BUY" / "SELL". Frozen. |
CloseSignal(...) | Exit intent. Framework auto-sizes to abs(position), MARKET. |
TakeProfitSignal(...) | Attribution subclass; default tag "tp". |
StopLossSignal(...) | Attribution subclass; default tag "sl". |
OrderRequest(symbol, side, qty, price, ord_type, sub_account, tag, parent_signal_id) | Concrete order. May trade a different symbol than the signal (cross-asset). Frozen. |
Trigger pluggability
| Class attr | Default | When to override |
|---|---|---|
trigger_on_bar | True | Signal-driven default. Set False for pure quote-driven alphas (MM). |
trigger_on_quote | False | Set True for quote-tick triggered alphas (MM / scalping). |
should_evaluate_on_quote(instrument, quote) | returns self.trigger_on_quote | Throttle: typical MM impl returns True only when mid moved. |
bar_window_ms / bar_max_wait_ms | 500 / 5000 | Joint-trigger debounce + hard cap. |
declared_params | None | Set to a set[str] to catch params key typos at __init__. |
Lifecycle
Construct the alpha with a PaperBrokerClient + RedisMarketDataClient wired from environment variables. Validates required env vars and raises EnvironmentError listing what's missing. Bypass with allow_defaults=True (dev) or pass client= / market_data= explicitly.
Sync convenience — blocks until Ctrl-C or alpha.stop().
Async lifecycle. Subscribes all config.instruments via a single BarAggregator + parallel quote stream, then calls market_data.start() if the backend exposes one (Kafka requires; Redis no-op).
Caveat: alpha.start() does not call client.connect(). When constructing the client explicitly (vs from_paper()), connect before alpha.start(). A unified BrokerClient.from_profile() in v0.2.8 will fold this in.
Cancels the bar-collation timer, flushes state_store, unsubscribes.
Single-instrument template
from paperbroker.alpha import AlphaContext, Signal, SignalDrivenAlpha
class RSI1MAlpha(SignalDrivenAlpha):
declared_params = {"rsi_period", "oversold", "overbought"}
def get_indicators(self, ctx: AlphaContext):
sym = self.config.instruments[0]
closes = [b.close for b in ctx.bars[sym]]
return {"rsi": _rsi(closes, int(self.config.params["rsi_period"]))}
def get_signals(self, indicators, ctx):
sym, rsi = self.config.instruments[0], indicators["rsi"]
if rsi is None: return []
pos = ctx.positions.get(sym)
if pos and abs(pos.quantity) > 1e-9:
if pos.quantity > 0 and rsi > 50: return [Signal(sym, "SELL", tag="exit")]
if pos.quantity < 0 and rsi < 50: return [Signal(sym, "BUY", tag="exit")]
return []
if rsi < 30: return [Signal(sym, "BUY", tag="entry")]
if rsi > 70: return [Signal(sym, "SELL", tag="entry")]
return []
def get_entry_price(self, signal, ctx): return ctx.bars[signal.symbol][-1].close
def get_quantity(self, signal, ctx): return self.config.qty_for(signal.symbol)
alpha = RSI1MAlpha.from_paper(
instruments=["HNXDS:VN30F2606"],
sub_account="main", timeframe="1m", qty=1,
params={"rsi_period": 14, "oversold": 30, "overbought": 70},
state_path="state/rsi1m.json",
)
alpha.run()
Multi-instrument joint trigger
Pass instruments=[A, B, ...] and per-symbol qty={A: ..., B: ...}. The framework collates bar events within bar_window_ms; one AlphaContext is built and the hook chain fires once per burst. ctx.triggered_by lists which symbols' bars closed in the window. get_signals can return a list of Signals for atomic multi-leg dispatch (pair-trade, basket, cross-asset).
CLI v0.2.6+
YAML-driven alpha runner. Requires the optional [cli] extras.
pip install 'paperbroker_client[cli]'
paperbroker run --config examples/09_alpha_rsi_1m.yaml
paperbroker diagnose
YAML schema
alpha:
# Dotted module path OR file-path form (supports digit-prefix filenames):
class: examples/09_alpha_rsi_1m.py:RSI1MAlpha
instruments:
- HNXDS:VN30F2606
sub_account: main
timeframe: 1m
qty: 1
params:
rsi_period: 14
oversold: 30
overbought: 70
state_path: state/rsi1m.json
The class: field accepts either a dotted module path (my_pkg.MyAlpha) or a file-path form (examples/09_alpha_rsi_1m.py:RSI1MAlpha) — the latter handles digit-prefixed example filenames.