回想一下,当我们下单买入一只股票后,最关心的是什么?—— ”我的委托成交了吗 ?成交多少? ”
如果没有回调机制,我们可能就需要每隔一秒就去主动查询一次订单状态,
比如我们总是问店员:我的菜好了吗,什么时候上等等,这种方式的专业名词被称为被称为
”主动查询“它的缺点显而易见:
1.效率低下:大部分查询都是无效的,因为订单状态并不会每秒都变就像饭还没做好,我们问再多次也没办法吃上
2.延迟高:我们可能在查询的间隔期错过了状态变化的第一时间
3.资源浪费:频繁的查询会给交易服务器和我们的程序带来不必要的压力
而回调机制则采用事件驱动模式。我们只需要告诉店员:“等我的菜有任何变化时(在洗菜了,下锅了...),请通知我。”
子类中。一旦有相应的事件发生(如连接断开、订单状态更新、成交回报等),交易API就会自动、实时地调用我们写好的对应方法
剖析回调函数: XtQuantTraderCallback 中定义了多种回调方法,以应对不同的交易事件。一个功能完备的回调类应该尽可能地实现这些方法,以便全面监控交易过程。
import logging
from xtquant.xttrader import XtQuantTraderCallback
# 配置一个简单的日志记录器,比 print 更专业
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ComprehensiveCallback(XtQuantTraderCallback):
"""一个功能全面的回调处理类"""
def on_disconnected(self):
"""连接断开回调"""
logging.warning("交易服务器连接已断开!")
def on_stock_order(self, order):
"""
委托回报推送。当委托状态发生任何变化时(如已报、部成、已成、已撤),此方法都会被调用。
:param order: XtOrder 对象,包含了委托的详细信息。
"""
logging.info(
f"[委托回报] 账户:{order.account_id} | 股票:{order.stock_code} | "
f"编号:{order.order_id} | 状态:{order.order_status} | "
f"委托量:{order.order_volume} | 成交量:{order.traded_volume}"
)
def on_stock_trade(self, trade):
"""
成交回报推送。只有当委托实际产生交易时,此方法才会被调用。
:param trade: XtTrade 对象,包含了成交的详细信息。
"""
logging.info(
f"[成交回报] 账户:{trade.account_id} | 股票:{trade.stock_code} | "
f"成交编号:{trade.traded_id} | 价格:{trade.traded_price} | "
f"数量:{trade.traded_volume} | 金额:{trade.traded_amount}"
)
def on_stock_asset(self, asset):
"""
资金变动推送。当账户资金发生变化(如买卖股票、出入金)时被调用。
:param asset: XtAsset 对象,包含了最新的资金状况。
"""
logging.info(
f"[资金更新] 账户:{asset.account_id} | 总资产:{asset.total_asset:.2f} | "
f"可用资金:{asset.cash:.2f} | 持仓市值:{asset.market_value:.2f}"
)
def on_stock_position(self, position):
"""
持仓变动推送。当账户持仓发生变化时被调用。
:param position: XtPosition 对象,包含了单只股票的最新持仓信息。
"""
logging.info(
f"[持仓更新] 账户:{position.account_id} | 股票:{position.stock_code} | "
f"持仓量:{position.volume} | 可用数量:{position.can_use_volume} | "
f"市值:{position.market_value:.2f}"
)
def on_order_error(self, order_error):
"""
委托失败推送。当一个委托请求因为某些原因(如资金不足、参数错误)未能成功发送到交易所时被调用。
:param order_error: XtOrderError 对象。
"""
logging.error(
f"[委托失败] 账户:{order_error.account_id} | 委托编号:{order_error.order_id} | "
f"错误ID:{order_error.error_id} | 错误信息:{order_error.error_msg}"
)
def on_cancel_error(self, cancel_error):
"""
撤单失败推送。当一个撤单请求未能成功时被调用。
:param cancel_error: XtCancelError 对象。
"""
logging.error(
f"[撤单失败] 账户:{cancel_error.account_id} | 委托编号:{cancel_error.order_id} | "
f"错误ID:{cancel_error.error_id} | 错误信息:{cancel_error.error_msg}"
)
def on_account_status(self, status):
"""
账户状态变化推送。例如登录成功、登录失败、连接断开等。
:param status: XtAccountStatus 对象。
"""
logging.info(
f"[账户状态] 账户:{status.account_id} | 类型:{status.account_type} | "
f"状态:{status.status} | 描述:{status.status_msg}"
)观察回调的实时触发
现在,让我们编写一个完整的、可运行的程序,体会一下看看这些回调是如何被触发的。
我们这个程序将执行以下操作:
1.连接并订阅账户
2.查询并打印初始资金和持仓
3.下一个买单
4.等待几秒钟
5.撤销刚才的买单
6.等待程序结束
在整个过程中,我们将会通过 ComprehensiveCallback 打印的日志来观察每一步触发了哪些事件
import time
import logging
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant.xtconstant import STOCK_BUY, FIX_PRICE
# --- 1. 定义功能全面的回调类 (复制上一节的 ComprehensiveCallback 代码到这里) ---
# (此处省略,请将上一节的 ComprehensiveCallback 完整代码粘贴于此)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ComprehensiveCallback(XtQuantTraderCallback):
# ... (完整代码)
def on_disconnected(self):
logging.warning("交易服务器连接已断开!")
def on_stock_order(self, order):
logging.info(
f"[委托回报] 账户:{order.account_id} | 股票:{order.stock_code} | "
f"编号:{order.order_id} | 状态:{order.order_status} | "
f"委托量:{order.order_volume} | 成交量:{order.traded_volume}"
)
def on_stock_trade(self, trade):
logging.info(
f"[成交回报] 账户:{trade.account_id} | 股票:{trade.stock_code} | "
f"成交编号:{trade.traded_id} | 价格:{trade.traded_price} | "
f"数量:{trade.traded_volume} | 金额:{trade.traded_amount}"
)
def on_stock_asset(self, asset):
logging.info(
f"[资金更新] 账户:{asset.account_id} | 总资产:{asset.total_asset:.2f} | "
f"可用资金:{asset.cash:.2f} | 持仓市值:{asset.market_value:.2f}"
)
def on_stock_position(self, position):
logging.info(
f"[持仓更新] 账户:{position.account_id} | 股票:{position.stock_code} | "
f"持仓量:{position.volume} | 可用数量:{position.can_use_volume} | "
f"市值:{position.market_value:.2f}"
)
def on_order_error(self, order_error):
logging.error(
f"[委托失败] 账户:{order_error.account_id} | 委托编号:{order_error.order_id} | "
f"错误ID:{order_error.error_id} | 错误信息:{order_error.error_msg}"
)
def on_cancel_error(self, cancel_error):
logging.error(
f"[撤单失败] 账户:{cancel_error.account_id} | 委托编号:{cancel_error.order_id} | "
f"错误ID:{cancel_error.error_id} | 错误信息:{cancel_error.error_msg}"
)
def on_account_status(self, status):
logging.info(
f"[账户状态] 账户:{status.account_id} | 类型:{status.account_type} | "
f"状态:{status.status} | 描述:{status.status_msg}"
)
# --- 2. 主程序 ---
def main():
path = 'D:/XXXXXQMT交易端模拟/userdata_mini/' # 【重要】修改为你的路径
account_id = '100002000' # 【重要】修改为你的客户号
session_id = int(time.time())
xt_trader = XtQuantTrader(path, session_id)
acc = StockAccount(account_id)
callback = ComprehensiveCallback()
xt_trader.register_callback(callback)
xt_trader.start()
connect_result = xt_trader.connect()
if connect_result != 0:
logging.error(f"连接失败,错误码: {connect_result}")
return
subscribe_result = xt_trader.subscribe(acc)
if subscribe_result != 0:
logging.error(f"订阅失败,错误码: {subscribe_result}")
return
logging.info("连接和订阅成功,准备执行交易操作...")
time.sleep(1) # 等待一下,确保订阅信息处理完毕
# --- 步骤1: 查询初始状态 ---
logging.info("--- 查询初始资金 ---")
initial_asset = xt_trader.query_stock_asset(acc)
if initial_asset:
callback.on_stock_asset(initial_asset) # 手动调用一次,方便对比
# --- 步骤2: 下一个买单 ---
stock_code = '600000.SH' # 浦发银行
price = 8.0 # 注意:设置一个当前不可能成交的价格,以便后续撤单
volume = 100
logging.info(f"--- 准备下单: 买入 {stock_code} {volume}股,价格 {price} ---")
order_id = xt_trader.order_stock(acc, stock_code, STOCK_BUY, volume, FIX_PRICE, price)
if order_id <= 0:
logging.error(f"下单失败,错误码: {order_id}")
xt_trader.stop()
return
logging.info(f"下单请求已发送,委托编号: {order_id}")
# --- 步骤3: 等待几秒,观察委托回报 ---
logging.info("--- 等待5秒,观察委托状态变化... ---")
time.sleep(5)
# --- 步骤4: 撤销订单 ---
logging.info(f"--- 准备撤销委托: {order_id} ---")
cancel_result = xt_trader.cancel_order_stock(acc, order_id)
if cancel_result == 0:
logging.info("撤单请求已发送")
else:
logging.error(f"撤单请求发送失败,错误码: {cancel_result}")
# --- 步骤5: 再等待几秒,观察撤单回报 ---
logging.info("--- 等待5秒,观察最终状态... ---")
time.sleep(5)
# --- 步骤6: 停止程序 ---
xt_trader.stop()
logging.info("程序结束")
if __name__ == "__main__":
main()日志如下:
当我们运行上述代码后,会在控制台看到类似下面这样的日志流(具体时间、编号会不同):
1.程序启动,连接和订阅成功。
2.打印初始资金信息。
3.打印 --- 准备下单 ---。
4.on_stock_order 被触发,打印 [委托回报],状态可能是“已报”或“待报”。
5.如果下单导致资金冻结,on_stock_asset 可能会被触发,打印 [资金更新]。
6.程序等待5秒。
7.打印 --- 准备撤销委托 ---。
8.on_stock_order 再次被触发,打印 [委托回报],状态变为“已撤”。
9.如果撤单解冻了资金,on_stock_asset 会再次被触发,打印 [资金更新]。
10.程序等待5秒后结束。
这个案例我是发出了下单和撤单两个主动指令,而程序通过回调机制,自动地为我捕捉并报告了这期间发生的所有相关事件
学会了吗?
QMT/miniQMT免费申请
QMT免费领取学习案例
QMT落地辅助策略代写服务
需要欢迎联系 ~
著作权归文章作者所有。 未经作者允许禁止转载!