同花顺
2025年8月22日大约 8 分钟
请求api
import http.client
conn = http.client.HTTPSConnection("catpd.cn")
conn.request("GET", "/") # 必须先发送请求
response = conn.getresponse() # 然后才能获取响应
data = response.read()
print(data)
conn.close() # 使用完毕后关闭连接API
数据表
api
print(1111)
data = get_fundamentals(
query(
asharevalue.symbol,
asharevalue.report_date,
asharevalue.stat_date,
asharevalue.pe_lyr,
asharevalue.pe_ttm
).filter(
asharevalue.symbol=='601997.SH'
),
date = '20250812'
)
print(data)def init(context):
#设置策略初始持仓
set_holding_stocks({'000001.SZ': 200,'300033.SZ': 500,'600519.SH': 700})#====================================================================================================
#在编写策略时,我们首先需要大致确定一下我们编写过程需要用到的几个Python包,比如numpy.pandas.datetime,您
#需要根据您的需求,在策略的最上方进行导包操作。
from datetime import timedelta, date
#导入datetime相关包
import pandas as pd
#导入pandas包,用于数据分析。
#====================================================================================================
#====================================================================================================
#首先我们需要一个策略初始化函数,其功能相当于开户。
def init(context):
#def是Python语言创建函数的标志,initialize是函数名称,这里是初始化函数,(context)是账户信息对象,也就是
#该初始化函数中,只有初始一些账户信息。并且该函数只运行一次,相当于开户只需要一次就可以了。
context.n = 15
#设置n=15,并将其放到账户信息中,主要为了控制持股数量,控制到15个及以内。
run_monthly(trade,date_rule=-1)
#按月运行函数run_monthly(a,b),其中里面有两个参数,a是运行对象,这里是trade函数(详见34行代码),
#b是运行时间,这里是每个月倒数第一个交易日。
#====================================================================================================
#====================================================================================================
#其次我们需要构建一个交易系统,使得整个策略能运行起来!
def trade(context, bar_dict):
#我们创建一个取名为trade的函数,用于交易,其中设置context参数,我们就可以到别的函数中的context对象中的信息
#用到这里来,另外我们还有bar_dict参数,用于计算数据。
pb_list = stocks_pb(context,bar_dict)
#我们将stocks_pb(context,bar_dict)函数的选股结果导入pb_list对象
stock_list = list(set(pb_list))
#我们需要将pb_list对象的格式进行转化,使得运行不出错,直接用list(set(x))即可。其中set()是创建集合
#这里是创建股票集合,list()用来创建列表,这里是创建股票列表。
if len(list(context.portfolio.stock_account.positions.keys())) > 0:
#if 判断函数,len()是取长度数值,context.positions是账户持仓信息,len(context.positions)也就是
#当前持仓股票的数量,如果大于0,则进行下一步。
for stock in list(context.portfolio.stock_account.positions.keys()):
#for x in y 是循环函数,从y中逐一取出来设为x,这里的y是持仓个股列表。x是stock,将个股取出,
#stock中存放取出的个股,执行下一步,如果一个个股执行完毕,执行下一个个股,直到账户中所有持仓
#个股全部被执行后,跳出循环。
if stock not in stock_list:
#if判断函数,判断选股股票是否在选股函数列表,不在则运行下一步
order_target(stock, 0)
#order_target下单函数,stock是买卖的股,0是目前持仓,也就是清仓。
if len(stock_list) > 0:
#if 判断函数,判断选股列表中股票数量是否大于0
for stock in stock_list:
#for x in y循环函数,从股票列表中逐一取出个股
if stock not in list(context.portfolio.stock_account.positions.keys()):
#if判断函数,用于判断个股是否在个股持仓列表中。
if len(list(context.portfolio.stock_account.positions.keys())) < context.n :
#if 判断函数,用于判断持仓个股数,是否小于我们初始化设置的条件15个。
number = context.n - len(list(context.portfolio.stock_account.positions.keys()))
#如果不超出,那么计算,我们还能买的个股数量
order_value(stock,context.portfolio.stock_account.available_cash/number)
#下单函数,执行买入,买入的金额为可用现金/可买个股数
else:
order_value(stock,context.portfolio.stock_account.available_cash)
#else是if函数的衍生,如果个股数量等于或超过最大持仓,那么我们将剩下的资金全部买入
#====================================================================================================
#我们需要创建一个函数,其作用是选择我们股票,因子是pb市净率,来让我们的交易系统有股票可以买卖
def stocks_pb(context,bar_dict):
#我们创建一个取名为stocks_pb的函数,用于选股,其中设置context参数,我们就可以到别的函数中的context对象
#中的信息用到这里来,另外我们还有bar_dict参数,用于计算数据。
last_date = get_last_datetime().strftime('%Y%m%d')
#get_last_datetime()函数用来获取回测的前一天的日期,后面的strftime('%Y%m%d')是用来转化成年月日的格式。
pb = get_fundamentals(query(valuation.symbol,valuation.pb
).filter(valuation.pb > 0,
).order_by(valuation.pb.asc()
),date = last_date)
#get_fundamentals是获取财务数据的函数,query(x)是获取对象,这里是股票和pb,filter(y)是用来筛选条件,这
#里是pb>0,order_by(z)用来排序,将选择后的个股排序,这里是pb从小到大,date是日期,这里是回测前一日。
#其中sec()是从小到大排序。
return list(pb['valuation_symbol'][:15])
#return用来输出函数的执行结果。这里是我们选择出来的股票列表的前15只个股。def init(context):
context.n = 10
run_daily(trade)
def trade(context, bar_dict):
print(context.portfolio.stock_account)
# context.portfolio.stock_account.positions
pb = stocks_pb(context,bar_dict)
print(pb)
def add_code_name(code:str):
"""增加股票名称"""
data = get_security_info(code)
return data.display_name
def stocks_pb(context,bar_dict):
"""
0<PE扣非<10 0<PB<1.5 股息率>3%
"""
pb = get_fundamentals(query(
valuation.symbol,valuation.pb,valuation.pe_ttm,valuation.dividend_rate_12_months
).filter(
valuation.pe_ttm >0 ,
valuation.pe_ttm <10 ,
valuation.dividend_rate_12_months > 3,
valuation.pb >0,
valuation.pb < 1.5,
)
.order_by(valuation.dividend_rate_12_months.desc())
,date='20250819')
pb['name'] = pb['valuation_symbol'].apply(add_code_name)
return pb同型分析
def get_stock_industry_stocks(symbol, category_type='ci', date='20230801'):
"""
根据股票代码和分类类型返回对应行业的成分股列表,
并打印详细的所属行业信息(含中文名称)。
参数:
----------
symbol : str
股票代码,例如 '300033.SZ'
category_type : str
行业分类类型,可选值:
- 't' : 同花顺行业 (industryid)
- 's' : 申万行业 (s_industryid)
- 'c' : 中信行业 (c_industryid)
- 'ci' : 中信行业 (ci_industryid)
- 'gi' : 全球行业分类 (gi_industryid)
date : str
查询日期,格式 'YYYYMMDD'
返回:
----------
list[str]
行业成分股代码列表
"""
# 1️⃣ 获取股票所属行业代码信息
industry_info = get_symbol_industry(symbol, date=date)
# 各体系字段映射
category_map = {
't': ('同花顺行业', ['industryid1', 'industryid2', 'industryid3']),
's': ('申万行业', ['s_industryid1', 's_industryid2', 's_industryid3']),
'c': ('中信行业', ['c_industryid', 'c_industryid2']),
'ci': ('中信行业', ['ci_industryid1', 'ci_industryid2', 'ci_industryid3']),
'gi': ('全球行业', ['gi_industryid1', 'gi_industryid2', 'gi_industryid3', 'gi_industryid4']),
}
if category_type not in category_map:
raise ValueError(f"未知分类类型: {category_type},可选值: {list(category_map.keys())}")
category_name, fields = category_map[category_type]
# 2️⃣ 获取行业中文名映射表(只针对当前体系)
relate_type = f"{fields[0]}" # 例如 "ci_industryid1"
try:
industry_relate_df = get_industry_relate(date=date, types=relate_type)
except Exception:
industry_relate_df = None
# 3️⃣ 构造行业路径
industry_path = []
for f in fields:
val = getattr(industry_info, f, None)
if val:
# 查询中文名
name = None
if industry_relate_df is not None:
match = industry_relate_df[industry_relate_df['industry_symbol'] == val]
if not match.empty:
name = match.index[0] # index 是行业中文名
if name:
industry_path.append(f"{f}={val}({name})")
else:
industry_path.append(f"{f}={val}")
# 取第一个层级代码作为行业成分股查询代码
industry_code = getattr(industry_info, fields[0])
# 4️⃣ 打印行业信息
print(f"【股票代码】{symbol}")
print(f"【分类类型】{category_type}({category_name})")
print("【所属行业路径】")
for p in industry_path:
print(" -", p)
print(f"【用于成分股提取的行业代码】{industry_code}\n")
# 5️⃣ 获取成分股
index_list = get_industry_stocks(industry_code, date)
print(f"【成分股数量】{len(index_list)}")
return index_list
def txfx(code, statDate='2025q1'):
df = get_fundamentals(
query(
income.symbol,
income.report_date,
income.stat_date,
income.operating_income,
income.net_profit,
income.operations_costs,
income.operations_taxes_and_surcharges,
income.sales_fee,
income.manage_fee,
income.financial_expenses,
income.impairment_loss_on_assets,
income.fv_chg_income,
income.investment_income,
income.profit_from_operations,
income.profit_before_tax,
income.minus_income_tax_expenses,
income.net_profit
).filter(
income.symbol == code,
income.reporttypecode == 'HB'
),
statDate=statDate
)
if df.empty:
return None
base = df['income_stat_operating_income'].iloc[0]
cols = [
'income_stat_operating_income',
'income_stat_operations_costs',
'income_stat_operations_taxes_and_surcharges',
'income_stat_sales_fee',
'income_stat_manage_fee',
'income_stat_financial_expenses',
'income_stat_impairment_loss_on_assets',
'income_stat_fv_chg_income',
'income_stat_investment_income',
'income_stat_profit_from_operations',
'income_stat_profit_before_tax',
'income_stat_minus_income_tax_expenses',
'income_stat_net_profit'
]
df_common = df[cols].T
df_common.columns = ['amount']
df_common['common_size'] = df_common['amount'] / base
df_common = df_common[['common_size']]
col_map = {
"income_stat_operating_income": "营业收入",
"income_stat_operations_costs": "营业成本",
"income_stat_operations_taxes_and_surcharges": "营业税金及附加",
"income_stat_sales_fee": "销售费用",
"income_stat_manage_fee": "管理费用",
"income_stat_financial_expenses": "财务费用",
"income_stat_impairment_loss_on_assets": "资产减值损失",
"income_stat_fv_chg_income": "公允价值变动收益",
"income_stat_investment_income": "投资收益",
"income_stat_profit_from_operations": "营业利润",
"income_stat_profit_before_tax": "利润总额",
"income_stat_minus_income_tax_expenses": "所得税费用",
"income_stat_net_profit": "净利润"
}
df_common = df_common.rename(index=col_map)
return df_common
def filter_financials(df_single, filters=None):
"""
根据给定过滤条件筛选财务同型分析结果。
参数:
----------
df_single : pd.DataFrame
单只股票的同型分析结果(来自 txfx)
filters : dict
过滤条件字典,例如:
{
"净利润": (">", 0),
"营业收入": (">", 1e8),
"财务费用": ("<", 0.1)
}
返回:
----------
bool
True 表示通过过滤(保留)
False 表示被过滤掉
"""
if df_single is None or df_single.empty:
return False
if not filters:
return True # 没有过滤条件则默认保留
for metric, (op, value) in filters.items():
if metric not in df_single.index:
continue # 跳过不存在的指标
val = df_single.loc[metric, "common_size"]
if op == ">" and not (val > value):
return False
elif op == ">=" and not (val >= value):
return False
elif op == "<" and not (val < value):
return False
elif op == "<=" and not (val <= value):
return False
elif op == "==" and not (val == value):
return False
elif op == "!=" and not (val != value):
return False
return True
def txfx_industry(code, category_type='ci', date='20230801', statDate='2025q1', filters=None):
"""
计算指定股票及其行业成分股的同行财务同型分析。
增加统一日志输出 & 可配置过滤。
"""
industry_stocks = get_stock_industry_stocks(code, category_type=category_type, date=date)
if not industry_stocks:
print("未找到行业成分股。")
return None
print(f"\n【行业成分股数量】{len(industry_stocks)} 支\n")
all_results = []
log_records = [] # 日志记录表
for s in industry_stocks:
status = "成功"
reason = ""
try:
df_single = txfx(s, statDate=statDate)
if df_single is None:
status = "失败"
reason = "无数据"
else:
# 过滤逻辑
if not filter_financials(df_single, filters):
status = "过滤"
reason = "不符合过滤条件"
else:
df_single.columns = [s]
all_results.append(df_single)
except Exception as e:
status = "失败"
reason = str(e)
log_records.append({
"股票代码": s,
"状态": status,
"原因": reason
})
# 🔹 输出统一日志
import pandas as pd
log_df = pd.DataFrame(log_records)
print("【行业成分股处理日志】")
print(log_df.to_string(index=False))
# 🔹 检查有效样本
if not all_results:
print("\n❌ 所有成分股数据获取失败或均被过滤。")
return None
df_all = pd.concat(all_results, axis=1)
# 🔹 统计汇总
df_summary = pd.DataFrame({
'平均数': df_all.mean(axis=1),
'中位数': df_all.median(axis=1),
'最大值': df_all.max(axis=1),
'最小值': df_all.min(axis=1)
})
# 🔹 加上本公司数据
print(f"\n【提取 {code} 的财务同型分析数据...】")
df_target = txfx(code, statDate=statDate)
if df_target is not None:
df_target.columns = ['本公司']
df_summary = pd.concat([df_summary, df_target], axis=1)
else:
print(f"{code} 的财务数据获取失败。")
# 🔹 输出结果
print("\n【行业同型分析汇总】")
print(df_summary)
# print(df_summary.applymap(lambda x: f"{x:.2%}"))
return df_summary, log_df