在之前的文章中,我们已经学习了回调机制的基础
本章我们将深入探索回调机制的几个相关问题
1. 异步:async 方法与性能提升
同步 vs. 异步:为什么要用异步函数处理?
order_stock 是一个同步下单函数。当调用它时,程序会停在原地,直到 MiniQMT 的后台确认已经将委托发送出去,并返回一个 order_id。这个过程虽然简单,但在高频或多任务场景下不适用
想象一下,如果网络稍微延迟,或者我们想在短时间内连续下单,同步模式下,每个订单都必须等待前一个完成后才能发送,总耗时会是每笔单子等待时间的总和
而异步方法,如 order_stock_async,则更高效。我们把下单指令交给它,它立刻回复:已执行,这是任务编号(seq)”,然后程序就可以马上去做别的事情了。它会在后台处理发送任务,处理完成后,再通过 on_order_stock_async_response 回调函数把正式的委托编号(order_id)给到我们
order_stock_async 的工作流如下:
1. 调用:seq = xt_trader.order_stock_async(...)
2. 立即返回:得到一个大于 0 的 seq 序号,代表下单请求已被系统接受
3. 后台处理:XtQuantTrader 的内部线程负责将这个请求发送给服务器
4. 回调通知:服务器处理后,on_order_stock_async_response 回调被触发,返回的 response 对象中同时包含了当初得到的 seq 和服务器生成的 order_id。
实战:用 seq 序号匹配异步响应
关键在于,我们需要一个地方存储 seq 和下单意图的对应关系,以便在回调触发时知道是哪个单成功下出
示例代码如下:
"""
异步下单与 seq 序号匹配
核心思想:
1. 调用 order_stock_async() 异步下单,立刻得到 seq 序号
2. 用字典记录 seq 和订单信息的对应关系
3. 在 on_order_stock_async_response 回调中,根据 seq 找到对应的 order_id
4. 完成 seq -> order_id 的映射
"""
import time
import logging
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant.xtconstant import STOCK_BUY, FIX_PRICE
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
# ============================================================================
# 核心:用字典存储 seq 和订单的对应关系
# ============================================================================
class SimpleAsyncManager:
"""简单的异步订单管理器"""
def __init__(self):
# 关键数据结构:{ seq: { stock_code, volume, order_id } }
self.requests = {}
def add_request(self, seq, stock_code, volume):
"""记录一个异步请求"""
self.requests[seq] = {
'stock_code': stock_code,
'volume': volume,
'order_id': None # 初始为 None,等待回调填充
}
logging.info(f"✓ 记录请求:seq={seq}, 股票={stock_code}, 数量={volume}")
def match_order_id(self, seq, order_id):
"""匹配 seq 和 order_id"""
if seq in self.requests:
self.requests[seq]['order_id'] = order_id
logging.info(f"✓ 匹配成功:seq={seq} -> order_id={order_id}")
else:
logging.warning(f"⚠️ 未找到 seq={seq} 的请求")
# ============================================================================
# 回调类
# ============================================================================
class SimpleAsyncCallback(XtQuantTraderCallback):
"""简单的异步回调处理"""
def __init__(self, manager):
super().__init__()
self.manager = manager
def on_order_stock_async_response(self, response):
"""
异步下单响应回调
response 对象包含:
- response.seq: 你当初得到的序号
- response.order_id: 交易所生成的订单号
"""
seq = response.seq
order_id = response.order_id
logging.info(f"📨 收到异步响应:seq={seq}, order_id={order_id}")
self.manager.match_order_id(seq, order_id)
def on_order_error(self, order_error):
"""下单错误"""
logging.error(f"❌ 错误:{order_error.error_msg}")
# ============================================================================
# 主程序
# ============================================================================
def main():
path = 'D:/XXXXQMT交易端模拟/userdata_mini/'
account_id = '100000001'
session_id = int(time.time())
xt_trader = XtQuantTrader(path, session_id)
acc = StockAccount(account_id)
# 创建管理器和回调
manager = SimpleAsyncManager()
callback = SimpleAsyncCallback(manager)
xt_trader.register_callback(callback)
# 启动和连接
xt_trader.start()
if xt_trader.connect() != 0:
logging.error("连接失败")
return
if xt_trader.subscribe(acc) != 0:
logging.error("订阅失败")
return
logging.info("连接成功")
time.sleep(1)
# ========================================================================
# 异步下单:快速连续下多个单
# ========================================================================
logging.info("=" * 60)
logging.info("开始异步下单")
logging.info("=" * 60)
# 下单1
seq1 = xt_trader.order_stock_async(
acc, '600000.SH', STOCK_BUY, 100, FIX_PRICE, 8.0, 'test', 'order1'
)
manager.add_request(seq1, '600000.SH', 100)
# 下单2(立刻发送,不用等待下单1的响应)
seq2 = xt_trader.order_stock_async(
acc, '601988.SH', STOCK_BUY, 200, FIX_PRICE, 5.0, 'test', 'order2'
)
manager.add_request(seq2, '601988.SH', 200)
# 下单3
seq3 = xt_trader.order_stock_async(
acc, '000001.SZ', STOCK_BUY, 300, FIX_PRICE, 12.0, 'test', 'order3'
)
manager.add_request(seq3, '000001.SZ', 300)
logging.info(f"已发送 3 个异步下单请求")
# ========================================================================
# 等待回调处理
# ========================================================================
logging.info("等待异步响应... (3秒)")
time.sleep(3)
# ========================================================================
# 打印结果
# ========================================================================
logging.info("=" * 60)
logging.info("最终结果")
logging.info("=" * 60)
for seq, info in manager.requests.items():
logging.info(
f"seq={seq} | 股票={info['stock_code']} | "
f"数量={info['volume']} | order_id={info['order_id']}"
)
xt_trader.stop()
logging.info("程序结束")
if __name__ == "__main__":
main()日志如下:
QMT/miniQMT免费申请
QMT免费领取学习案例
QMT落地辅助策略代写服务
需要的朋友欢迎联系 ~
著作权归文章作者所有。