import QUANTAXIS as QA
import pandas as pd
import numpy as np
# 配置数据
t = QA.QA_fetch_stock_day_adv('000948','2021-07-20','2021-09-01').close
print(t.index)
t
def MA(Series, N):
return pd.Series.rolling(Series, N).mean()
QA.MA(t,5)
def EMA(Series, N):
return pd.Series.ewm(Series, span=N, min_periods=N - 1, adjust=True).mean()
QA.EMA(t,12)
def SMA(Series, N, M=1):
"""
威廉SMA算法
本次修正主要是对于返回值的优化,现在的返回值会带上原先输入的索引index
2018/5/3
@yutiansut
"""
ret = []
# 循环是从 1 开始(而不是从 0 开始)
i = 1
length = len(Series)
# 跳过X中前面几个 nan 值
# 如果遇到空值,跳过,进入下一个 i
while i < length:
if np.isnan(Series.iloc[i]):
i += 1
else:
break
# 前值,就使用非空的当前第一个值
preY = Series.iloc[i] # Y'
# 追加到 list 中
ret.append(preY)
while i < length:
# M/N 作为平滑系数,加在当前值之上, 而 (N - M)/N ,加在前值之上
Y = (M * Series.iloc[i] + (N - M) * preY) / float(N)
# 最新值追加到list尾部
ret.append(Y)
# 最新值赋值给前值,用于下一次计算(在 i+1 之后,产生新的最新值)
preY = Y
i += 1
return pd.Series(ret, index=Series.tail(len(ret)).index)
SMA(t, 5, M=0.75)
# 当前值与前 N 个位置的值得差
def DIFF(Series, N=1):
return pd.Series(Series).diff(N)
DIFF(t,N=1)
# rolling表示滑动窗口,即滑动N个位置,作为窗口,计算 max()
def HHV(Series, N):
return pd.Series(Series).rolling(N).max()
HHV(t,2)
# rolling表示滑动窗口,即滑动N个位置,作为窗口,计算 min()
def LLV(Series, N):
return pd.Series(Series).rolling(N).min()
LLV(t,3)
# rolling表示滑动窗口,即滑动N个位置,作为窗口,计算 sum()
def SUM(Series, N):
return pd.Series.rolling(Series, N).sum()
SUM(t,3)
# 取绝对值,消除方向性带来的误差
def ABS(Series):
return abs(Series)
t2=DIFF(t,N=1)
print(t2)
ABS(t2)
# 对 Series 的值逐一比较
def MAX(A, B):
var = IF(A > B, A, B)
return var
t2=DIFF(t,N=1)
t3=ABS(t2)
MAX(t2,t3)
def MIN(A, B):
var = IF(A < B, A, B)
return var
t2=DIFF(t,N=1)
t3=ABS(t2)
MIN(t2,t3)
# diff() 缺省 N=1,计算前后 1 个位置的差。
def CROSS(A, B):
"""A<B then A>B A上穿B B下穿A
Arguments:
A {[type]} -- [description]
B {[type]} -- [description]
Returns:
[type] -- [description]
"""
var = np.where(A < B, 1, 0)
print('var is: ', var)
try:
index = A.index
except:
index = B.index
# 按照 var 以及 1 个位移的差值,如果小于 0 ,返回 1,否则返回 0,以 int 形式呈现
return (pd.Series(var, index=index).diff() < 0).apply(int)
# return pd.Series(var, index=index).diff()
t2=DIFF(t,N=1)
#print(t2)
t3=ABS(t2)
#print(t3)
CROSS(t2,t3)
def COUNT(COND, N):
"""
2018/05/23 修改
参考https://github.com/QUANTAXIS/QUANTAXIS/issues/429
现在返回的是series
"""
# COND 是序列 A 与序列 B 的比较
# 满足条件时,返回 1 ,否则返回 0
# 然后计算前 N 个位置的位移窗口,并加总
return pd.Series(np.where(COND, 1, 0), index=COND.index).rolling(N).sum()
t2=DIFF(t,N=1)
t3=ABS(t2)
COUNT(t2<t3, 2)
# 对于 序列的 IF 判断,有一个隐含条件:
"""
and:
T and T = T
T and F = F
F and T = F
F and F = F
or :
T or T = T
T or F = T
F or T = T
F or F = F
"""
def IF(COND, V1, V2):
# 此处 COND 的序列 比较。属于 and ...
var = np.where(COND, V1, V2)
# print('=----------> var is: \n', var)
try:
try:
index = V1.index
except:
index = COND.index
except:
index = V2.index
return pd.Series(var, index=index)
t2=DIFF(t,N=1)
t3=ABS(t2)
print('>>>>>>>>>> COND is: \n', t2>t3)
IF(t2>t3, t2, t3)
# 对数据进行移动 N 个位置
def REF(Series, N):
return Series.shift(N)
REF(t,3)
def STD(Series, N):
return pd.Series.rolling(Series, N).std()
STD(t,3)
def AVEDEV(Series, N):
"""
平均绝对偏差 mean absolute deviation
修正: 2018-05-25
之前用mad的计算模式依然返回的是单值
"""
return Series.rolling(N).apply(lambda x: (np.abs(x - x.mean())).mean(), raw=True)
AVEDEV(t,5)
def BBI(Series, N1, N2, N3, N4):
'多空指标'
bbi = (MA(Series, N1) + MA(Series, N2) +
MA(Series, N3) + MA(Series, N4)) / 4
DICT = {'BBI': bbi}
VAR = pd.DataFrame(DICT)
# print('=----------> VAR is: \n', VAR)
return VAR
def BBIBOLL(Series, N1, N2, N3, N4, N, M): # 多空布林线
bbiboll = BBI(Series, N1, N2, N3, N4)
# print('=----------> bbiboll is: \n', bbiboll)
UPER = bbiboll + M * STD(bbiboll, N)
DOWN = bbiboll - M * STD(bbiboll, N)
DICT = {'BBIBOLL': bbiboll, 'UPER': UPER, 'DOWN': DOWN}
# print('=----------> type(DICT) is: \n', type(DICT), '\n OVER')
VAR = pd.DataFrame(DICT, index=[0])
# VAR = pd.DataFrame(DICT)
# print('=----------> VAR is: \n', VAR)
return VAR
BBIBOLL(t, 3, 6, 12, 24, 20, 3)
# 演示数据
tt = QA.QA_fetch_stock_day_adv('000948','2021-05-01','2021-09-12').to_qfq()
print('type(tt) is: \n', type(tt))
print('type(tt.data) is: \n', type(tt.data))
tt.data
def QA_indicator_OSC(DataFrame, N=20, M=6):
"""变动速率线
震荡量指标OSC,也叫变动速率线。属于超买超卖类指标,是从移动平均线原理派生出来的一种分析指标。
它反应当日收盘价与一段时间内平均收盘价的差离值,从而测出股价的震荡幅度。
按照移动平均线原理,根据OSC的值可推断价格的趋势,如果远离平均线,就很可能向平均线回归。
"""
C = DataFrame['close']
# 两个序列(当日收盘价序列,收盘价的N日收盘价平均价序列)的差值
OS = (C - MA(C, N)) * 100
# 再计算 M 日的指数移动平均价
# 指数移动平均指标,是一种移动平均指标,它给予最近的数据以更大的权重和重要性
MAOSC = EMA(OS, M)
DICT = {'OSC': OS, 'MAOSC': MAOSC}
return pd.DataFrame(DICT)
QA_indicator_OSC(tt,20,6)
def QA_indicator_BBI(DataFrame, N1=3, N2=6, N3=12, N4=24):
'多空指标'
C = DataFrame['close']
# 分别计算四个窗口的均值,在计算四个值得平均数
bbi = (MA(C, N1) + MA(C, N2) + MA(C, N3) + MA(C, N4)) / 4
DICT = {'BBI': bbi}
return pd.DataFrame(DICT)
QA_indicator_BBI(tt, 3, 6, 12, 24)
def QA_indicator_PBX(DataFrame, N1=3, N2=5, N3=8, N4=13, N5=18, N6=24):
'瀑布线'
C = DataFrame['close']
PBX1 = (EMA(C, N1) + EMA(C, 2 * N1) + EMA(C, 4 * N1)) / 3
PBX2 = (EMA(C, N2) + EMA(C, 2 * N2) + EMA(C, 4 * N2)) / 3
PBX3 = (EMA(C, N3) + EMA(C, 2 * N3) + EMA(C, 4 * N3)) / 3
PBX4 = (EMA(C, N4) + EMA(C, 2 * N4) + EMA(C, 4 * N4)) / 3
PBX5 = (EMA(C, N5) + EMA(C, 2 * N5) + EMA(C, 4 * N5)) / 3
PBX6 = (EMA(C, N6) + EMA(C, 2 * N6) + EMA(C, 4 * N6)) / 3
DICT = {'PBX1': PBX1, 'PBX2': PBX2, 'PBX3': PBX3,
'PBX4': PBX4, 'PBX5': PBX5, 'PBX6': PBX6}
return pd.DataFrame(DICT)
QA_indicator_PBX(tt, 3, 5, 8, 13, 18, 24)
def QA_indicator_BOLL(DataFrame, N=20, P=2):
'布林线'
C = DataFrame['close']
# 均线
boll = MA(C, N)
# 上轨
UB = boll + P * STD(C, N)
# 下轨
LB = boll - P * STD(C, N)
DICT = {'BOLL': boll, 'UB': UB, 'LB': LB}
return pd.DataFrame(DICT)
QA_indicator_BOLL(tt, N=20, P=2)
def QA_indicator_ROC(DataFrame, N=12, M=6):
'变动率指标'
C = DataFrame['close']
# REF(N) 是对数据进行移动 N 个位置
# 当前价格与前N的位置的价格之差,再除以前N位置的价格,计算出比率
roc = 100 * (C - REF(C, N)) / REF(C, N)
# 计算 M 日 该比率的均值
ROCMA = MA(roc, M)
DICT = {'ROC': roc, 'ROCMA': ROCMA}
return pd.DataFrame(DICT)
QA_indicator_ROC(tt, N=12, M=6)
def QA_indicator_MTM(DataFrame, N=12, M=6):
'动量线'
C = DataFrame.close
# REF(N) 是对数据进行移动 N 个位置
# 先计算当前价格与前 N 个位移价格的差
mtm = C - REF(C, N)
# 再计算 M 个该差的平均值
MTMMA = MA(mtm, M)
DICT = {'MTM': mtm, 'MTMMA': MTMMA}
return pd.DataFrame(DICT)
QA_indicator_MTM(tt, N=12, M=6)
def QA_indicator_KDJ(DataFrame, N=9, M1=3, M2=3):
C = DataFrame['close']
H = DataFrame['high']
L = DataFrame['low']
RSV = ((C - LLV(L, N)) / (HHV(H, N) - LLV(L, N)) * 100).groupby('code').fillna(method='ffill')
K = SMA(RSV, M1)
D = SMA(K, M2)
J = 3 * K - 2 * D
DICT = {'KDJ_K': K, 'KDJ_D': D, 'KDJ_J': J}
return pd.DataFrame(DICT)
QA_indicator_KDJ(tt, N=9, M1=3, M2=3)
def QA_indicator_MFI(DataFrame, N=14):
"""
资金指标
TYP := (HIGH + LOW + CLOSE)/3;
V1:=SUM(IF(TYP>REF(TYP,1),TYP*VOL,0),N)/SUM(IF(TYP<REF(TYP,1),TYP*VOL,0),N); # “:=”表示定义为是编程语言里的赋值语句的符号,用来定义一个新出现的符号
MFI:100-(100/(1+V1));
赋值: (最高价 + 最低价 + 收盘价)/3
V1赋值: 如果 TYP > 1日前的TYP, 返回 TYP*成交量(手), 否则返回 0 的 N 日累和 / 如果 TYP < 1日前的 TYP, 返回TYP*成交量(手), 否则返回0 的 N日累和
输出资金流量指标:100-(100/(1+V1))
"""
C = DataFrame['close']
H = DataFrame['high']
L = DataFrame['low']
VOL = DataFrame['volume']
TYP = (C + H + L) / 3
V1 = SUM(IF(TYP > REF(TYP, 1), TYP * VOL, 0), N) / \
SUM(IF(TYP < REF(TYP, 1), TYP * VOL, 0), N)
mfi = 100 - (100 / (1 + V1))
DICT = {'MFI': mfi}
return pd.DataFrame(DICT)
QA_indicator_MFI(tt, N=14)
def QA_indicator_ATR(DataFrame, N=14):
"""
输出TR: (最高价-最低价) 和 昨收-最高价 的绝对值的较大值 和 昨收-最低价 的绝对值的较大值
输出真实波幅: TR 的 N 日简单移动平均
算法:今日振幅、今日最高与昨收差价、今日最低与昨收差价中的最大值,为真实波幅,求真实波幅的N日移动平均
参数:N 天数,一般取14
"""
C = DataFrame['close']
H = DataFrame['high']
L = DataFrame['low']
TR = MAX(MAX((H - L), ABS(REF(C, 1) - H)), ABS(REF(C, 1) - L))
atr = MA(TR, N)
return pd.DataFrame({'TR': TR, 'ATR': atr})
QA_indicator_ATR(tt, N=14)
def QA_indicator_SKDJ(DataFrame, N=9, M=3):
"""
1.指标>80 时,回档机率大;指标<20 时,反弹机率大;
2.K在20左右向上交叉D时,视为买进信号参考;
3.K在80左右向下交叉D时,视为卖出信号参考;
4.SKDJ波动于50左右的任何讯号,其作用不大。
"""
CLOSE = DataFrame['close']
# N 个单位窗口的最小值
LOWV = LLV(DataFrame['low'], N)
# N 个单位窗口的最大值
HIGHV = HHV(DataFrame['high'], N)
# 计算 收盘价与最低价、最高价与收盘价的差,的比率,的 M 日指数移动平均值
RSV = EMA((CLOSE - LOWV) / (HIGHV - LOWV) * 100, M)
# 再次计算 上述指数移动均值的 M 日指数移动均值
K = EMA(RSV, M)
# 再次计算 上述 指数移动均值 的 均值
D = MA(K, M)
DICT = {'RSV': RSV, 'SKDJ_K': K, 'SKDJ_D': D}
return pd.DataFrame(DICT)
QA_indicator_SKDJ(tt, N=9, M=3)
def QA_indicator_WR(DataFrame, N, N1):
'威廉指标'
HIGH = DataFrame['high']
LOW = DataFrame['low']
CLOSE = DataFrame['close']
# 天线
WR1 = 100 * (HHV(HIGH, N) - CLOSE) / (HHV(HIGH, N) - LLV(LOW, N))
# 地线
WR2 = 100 * (HHV(HIGH, N1) - CLOSE) / (HHV(HIGH, N1) - LLV(LOW, N1))
DICT = {'WR1': WR1, 'WR2': WR2}
return pd.DataFrame(DICT)
QA_indicator_WR(tt, 5, 15)
def QA_indicator_BIAS(DataFrame, N1, N2, N3):
'乖离率'
# 先取收盘价
CLOSE = DataFrame['close']
# 计算 收盘价,与 N1 日的均线的差,除以 N1 日的收盘价均值 后的比率
BIAS1 = (CLOSE - MA(CLOSE, N1)) / MA(CLOSE, N1) * 100
BIAS2 = (CLOSE - MA(CLOSE, N2)) / MA(CLOSE, N2) * 100
BIAS3 = (CLOSE - MA(CLOSE, N3)) / MA(CLOSE, N3) * 100
DICT = {'BIAS1': BIAS1, 'BIAS2': BIAS2, 'BIAS3': BIAS3}
return pd.DataFrame(DICT)
QA_indicator_BIAS(tt, 3, 6, 12)
def QA_indicator_RSI(DataFrame, N1=12, N2=26, N3=9):
'相对强弱指标RSI1:SMA(MAX(CLOSE-LC,0),N1,1)/SMA(ABS(CLOSE-LC),N1,1)*100;'
CLOSE = DataFrame['close']
LC = REF(CLOSE, 1)
RSI1 = SMA(MAX(CLOSE - LC, 0), N1) / SMA(ABS(CLOSE - LC), N1) * 100
RSI2 = SMA(MAX(CLOSE - LC, 0), N2) / SMA(ABS(CLOSE - LC), N2) * 100
RSI3 = SMA(MAX(CLOSE - LC, 0), N3) / SMA(ABS(CLOSE - LC), N3) * 100
DICT = {'RSI1': RSI1, 'RSI2': RSI2, 'RSI3': RSI3}
return pd.DataFrame(DICT)
QA_indicator_RSI(tt, N1=12, N2=26, N3=9)
def QA_indicator_ADTM(DataFrame, N=23, M=8):
'动态买卖气指标'
HIGH = DataFrame.high
LOW = DataFrame.low
OPEN = DataFrame.open
# 如果开盘价大于昨日开盘价,取 今日最高价与开盘价之差,与今日开盘价与昨日开盘价之差,中的最大值
DTM = IF(OPEN > REF(OPEN, 1), MAX((HIGH - OPEN), (OPEN - REF(OPEN, 1))), 0)
# 如果开盘价小于昨日开盘价,取 今日开盘价与最低价之差,与今日开盘价与昨日开盘价之差,中的最大值
DBM = IF(OPEN < REF(OPEN, 1), MAX((OPEN - LOW), (OPEN - REF(OPEN, 1))), 0)
# 计算的到 N 的合计数
STM = SUM(DTM, N)
SBM = SUM(DBM, N)
# 考虑三种情况:大于、不等于、等于,来分别计算差值率,以更大的值为分母
# 如若等于,取 0
ADTM1 = IF(STM > SBM, (STM - SBM) / STM,
IF(STM != SBM, (STM - SBM) / SBM, 0))
# 上述值 的 M 日均值
MAADTM = MA(ADTM1, M)
DICT = {'ADTM': ADTM1, 'MAADTM': MAADTM}
return pd.DataFrame(DICT)
QA_indicator_ADTM(tt, N=23, M=8)
def QA_indicator_DDI(DataFrame, N=13, N1=26, M=1, M1=5):
"""
'方向标准离差指数'
分析DDI柱状线,由红变绿(正变负),卖出信号参考;由绿变红,买入信号参考。
"""
H = DataFrame['high']
L = DataFrame['low']
DMZ = IF((H + L) > (REF(H, 1) + REF(L, 1)), MAX(ABS(H - REF(H, 1)), ABS(L - REF(L, 1))), 0)
DMF = IF((H + L) < (REF(H, 1) + REF(L, 1)), MAX(ABS(H - REF(H, 1)), ABS(L - REF(L, 1))), 0)
DIZ = SUM(DMZ, N) / (SUM(DMZ, N) + SUM(DMF, N))
DIF = SUM(DMF, N) / (SUM(DMF, N) + SUM(DMZ, N))
ddi = DIZ - DIF
ADDI = SMA(ddi, N1, M)
AD = MA(ADDI, M1)
DICT = {'DDI': ddi, 'ADDI': ADDI, 'AD': AD}
# DICT = {'DMZ': DMZ, 'DMF': DMF, 'AD': AD} # 虽然公式相同,值却不同
return pd.DataFrame(DICT)
QA_indicator_DDI(tt, N=13, N1=26, M=1, M1=5)
def QA_indicator_CCI(DataFrame, N=14):
"""
TYP:=(HIGH+LOW+CLOSE)/3;
CCI:(TYP-MA(TYP,N))/(0.015*AVEDEV(TYP,N));
"""
typ = (DataFrame['high'] + DataFrame['low'] + DataFrame['close']) / 3
## 此处AVEDEV可能为0值 因此导致出错 +0.0000000000001
cci = ((typ - MA(typ, N)) / (0.015 * AVEDEV(typ, N) + 0.00000001))
a = 100
b = -100
return pd.DataFrame({
'CCI': cci, 'a': a, 'b': b
})
QA_indicator_CCI(tt, N=14)
def JLHB(data, m=7, n=5):
"""
通达信定义
VAR1:=(CLOSE-LLV(LOW,60))/(HHV(HIGH,60)-LLV(LOW,60))*80;
B:SMA(VAR1,N,1);
VAR2:SMA(B,M,1);
绝路航标:IF(CROSS(B,VAR2) AND B<40,50,0);
"""
var1 = (data['close'] - QA.LLV(data['low'], 60)) / \
(QA.HHV(data['high'], 60) - QA.LLV(data['low'], 60)) * 80
B = QA.SMA(var1, m)
var2 = QA.SMA(B, n)
return pd.DataFrame({'JLHB':QA.CROSS(B,var2)*(B<40)})
QA.QA_fetch_stock_day_adv('000001','2017-01-01','2017-05-31').to_qfq()
QA.QA_fetch_stock_day_adv('000001','2017-01-01','2017-05-31').to_qfq().data
QA.QA_fetch_stock_day_adv('000001','2017-01-01','2017-05-31').to_qfq().add_func(JLHB)
def MACD_JCSC(dataframe, SHORT=12, LONG=26, M=9):
"""
1.DIF向上突破DEA,买入信号参考。
2.DIF向下跌破DEA,卖出信号参考。
"""
CLOSE = dataframe.close
DIFF = QA.EMA(CLOSE, SHORT) - QA.EMA(CLOSE, LONG)
DEA = QA.EMA(DIFF, M)
MACD = 2*(DIFF-DEA)
CROSS_JC = QA.CROSS(DIFF, DEA)
CROSS_SC = QA.CROSS(DEA, DIFF)
ZERO = 0
return pd.DataFrame({'DIFF': DIFF, 'DEA': DEA, 'MACD': MACD, 'CROSS_JC': CROSS_JC, 'CROSS_SC': CROSS_SC, 'ZERO': ZERO})
QA.QA_fetch_stock_day_adv('000948','2021-01-01','2021-09-10').to_qfq().add_func(MACD_JCSC)
ind=tt.add_func(QA.QA_indicator_WR,1,2)
inc=QA.QA_DataStruct_Indicators(ind)
inc.data
inc.get_timerange('2021-09-07','2021-09-12','000948')
inc.get_code('000948')