带你详细了解miniQMT的xttrader模块 (第四篇) async 方法与性能提升

QUANT 2025-12-11 19:02:07 57 举报

在之前的文章中,我们已经学习了回调机制的基础

本章我们将深入探索回调机制的几个相关问题


1. 异步:async 方法与性能提升


同步 vs. 异步:为什么要用异步函数处理?


order_stock 是一个同步下单函数。当调用它时,程序会停在原地,直到 MiniQMT 的后台确认已经将委托发送出去,并返回一个 order_id。这个过程虽然简单,但在高频或多任务场景下不适用
想象一下,如果网络稍微延迟,或者我们想在短时间内连续下单,同步模式下,每个订单都必须等待前一个完成后才能发送,总耗时会是每笔单子等待时间的总和


而异步方法,如 order_stock_async,则更高效。我们把下单指令交给它,它立刻回复:已执行,这是任务编号(seq)”,然后程序就可以马上去做别的事情了。它会在后台处理发送任务,处理完成后,再通过 on_order_stock_async_response 回调函数把正式的委托编号(order_id)给到我们

order_stock_async 的工作流如下:
1. 调用:seq = xt_trader.order_stock_async(...)
2. 立即返回:得到一个大于 0 的 seq 序号,代表下单请求已被系统接受
3. 后台处理:XtQuantTrader 的内部线程负责将这个请求发送给服务器
4. 回调通知:服务器处理后,on_order_stock_async_response 回调被触发,返回的 response 对象中同时包含了当初得到的 seq 和服务器生成的 order_id。

实战:用 seq 序号匹配异步响应
关键在于,我们需要一个地方存储 seq 和下单意图的对应关系,以便在回调触发时知道是哪个单成功下出

示例代码如下:

"""
异步下单与 seq 序号匹配

核心思想:
1. 调用 order_stock_async() 异步下单,立刻得到 seq 序号
2. 用字典记录 seq 和订单信息的对应关系
3. 在 on_order_stock_async_response 回调中,根据 seq 找到对应的 order_id
4. 完成 seq -> order_id 的映射
"""

import time
import logging
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant.xtconstant import STOCK_BUY, FIX_PRICE

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

# ============================================================================
# 核心:用字典存储 seq 和订单的对应关系
# ============================================================================

class SimpleAsyncManager:
    """简单的异步订单管理器"""

    def __init__(self):
        # 关键数据结构:{ seq: { stock_code, volume, order_id } }
        self.requests = {}

    def add_request(self, seq, stock_code, volume):
        """记录一个异步请求"""
        self.requests[seq] = {
            'stock_code': stock_code,
            'volume': volume,
            'order_id': None  # 初始为 None,等待回调填充
        }
        logging.info(f"✓ 记录请求:seq={seq}, 股票={stock_code}, 数量={volume}")

    def match_order_id(self, seq, order_id):
        """匹配 seq 和 order_id"""
        if seq in self.requests:
            self.requests[seq]['order_id'] = order_id
            logging.info(f"✓ 匹配成功:seq={seq} -> order_id={order_id}")
        else:
            logging.warning(f"⚠️ 未找到 seq={seq} 的请求")

# ============================================================================
# 回调类
# ============================================================================

class SimpleAsyncCallback(XtQuantTraderCallback):
    """简单的异步回调处理"""

    def __init__(self, manager):
        super().__init__()
        self.manager = manager

    def on_order_stock_async_response(self, response):
        """
        异步下单响应回调

        response 对象包含:
        - response.seq: 你当初得到的序号
        - response.order_id: 交易所生成的订单号
        """
        seq = response.seq
        order_id = response.order_id
        logging.info(f"📨 收到异步响应:seq={seq}, order_id={order_id}")
        self.manager.match_order_id(seq, order_id)

    def on_order_error(self, order_error):
        """下单错误"""
        logging.error(f"❌ 错误:{order_error.error_msg}")

# ============================================================================
# 主程序
# ============================================================================

def main():
    path = 'D:/XXXXQMT交易端模拟/userdata_mini/'
    account_id = '100000001'

    session_id = int(time.time())
    xt_trader = XtQuantTrader(path, session_id)
    acc = StockAccount(account_id)

    # 创建管理器和回调
    manager = SimpleAsyncManager()
    callback = SimpleAsyncCallback(manager)
    xt_trader.register_callback(callback)

    # 启动和连接
    xt_trader.start()
    if xt_trader.connect() != 0:
        logging.error("连接失败")
        return

    if xt_trader.subscribe(acc) != 0:
        logging.error("订阅失败")
        return

    logging.info("连接成功")
    time.sleep(1)

    # ========================================================================
    # 异步下单:快速连续下多个单
    # ========================================================================

    logging.info("=" * 60)
    logging.info("开始异步下单")
    logging.info("=" * 60)

    # 下单1
    seq1 = xt_trader.order_stock_async(
        acc, '600000.SH', STOCK_BUY, 100, FIX_PRICE, 8.0, 'test', 'order1'
    )
    manager.add_request(seq1, '600000.SH', 100)

    # 下单2(立刻发送,不用等待下单1的响应)
    seq2 = xt_trader.order_stock_async(
        acc, '601988.SH', STOCK_BUY, 200, FIX_PRICE, 5.0, 'test', 'order2'
    )
    manager.add_request(seq2, '601988.SH', 200)

    # 下单3
    seq3 = xt_trader.order_stock_async(
        acc, '000001.SZ', STOCK_BUY, 300, FIX_PRICE, 12.0, 'test', 'order3'
    )
    manager.add_request(seq3, '000001.SZ', 300)

    logging.info(f"已发送 3 个异步下单请求")

    # ========================================================================
    # 等待回调处理
    # ========================================================================

    logging.info("等待异步响应... (3秒)")
    time.sleep(3)

    # ========================================================================
    # 打印结果
    # ========================================================================

    logging.info("=" * 60)
    logging.info("最终结果")
    logging.info("=" * 60)

    for seq, info in manager.requests.items():
        logging.info(
            f"seq={seq} | 股票={info['stock_code']} | "
            f"数量={info['volume']} | order_id={info['order_id']}"
        )

    xt_trader.stop()
    logging.info("程序结束")

if __name__ == "__main__":
    main()

日志如下:



QMT/miniQMT免费申请

QMT免费领取学习案例

QMT落地辅助策略代写服务

需要的朋友欢迎联系  ~


尊重知识,尊重市场 1

著作权归文章作者所有。

最新回复 ( 0 )
发新帖
0