"""
Portfolio-lib - Lightweight Python Backtesting Library
A comprehensive backtesting framework for algorithmic trading strategies
"""
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
import warnings
warnings.filterwarnings('ignore')
[docs]
class Position:
"""Represents a trading position"""
[docs]
def __init__(self, symbol: str, quantity: float, entry_price: float, timestamp: datetime = None):
self.symbol = symbol
self.quantity = quantity
self.shares = quantity # Alias for compatibility
self.entry_price = entry_price
self.price = entry_price # Alias for compatibility
self.timestamp = timestamp or datetime.now()
self.current_price = entry_price
@property
def market_value(self) -> float:
return self.quantity * self.current_price
@property
def unrealized_pnl(self) -> float:
return (self.current_price - self.entry_price) * self.quantity
@property
def unrealized_pnl_pct(self) -> float:
if self.entry_price == 0:
return 0.0
return (self.current_price - self.entry_price) / self.entry_price * 100
@property
def value(self) -> float:
"""Alias for market_value for compatibility"""
return self.market_value
[docs]
class Trade:
"""Represents a completed trade"""
[docs]
def __init__(self, symbol: str, quantity: float, price: float, timestamp: datetime, action: str = None, side: str = None, commission: float = 0.0):
self.symbol = symbol
# Support both action (BUY/SELL) and side (buy/sell) parameters
if action is not None:
self.action = action.upper()
self.side = action.lower()
elif side is not None:
self.side = side.lower()
self.action = side.upper()
else:
self.side = 'buy'
self.action = 'BUY'
self.quantity = quantity
self.shares = quantity # Alias for compatibility
self.price = price
self.timestamp = timestamp
self.commission = commission
@property
def gross_value(self) -> float:
return self.quantity * self.price
@property
def net_value(self) -> float:
return self.gross_value - self.commission
[docs]
class Portfolio:
"""Portfolio management class"""
[docs]
def __init__(self, initial_cash: float = 100000.0):
self.initial_cash = initial_cash
self.cash = initial_cash
self.positions: Dict[str, Position] = {}
self.trades: List[Trade] = []
self.equity_curve: List[float] = []
self.timestamps: List[datetime] = []
[docs]
def add_trade(self, trade: Trade):
"""Add a trade to the portfolio"""
self.trades.append(trade)
# Check if it's a buy trade (handle both side and action attributes)
is_buy = (hasattr(trade, 'side') and trade.side == 'buy') or (hasattr(trade, 'action') and trade.action == 'BUY')
if is_buy:
self.cash -= trade.net_value
if trade.symbol in self.positions:
# Average cost basis for additional shares
existing = self.positions[trade.symbol]
total_quantity = existing.quantity + trade.quantity
avg_price = ((existing.quantity * existing.entry_price) +
(trade.quantity * trade.price)) / total_quantity
existing.quantity = total_quantity
existing.entry_price = avg_price
else:
self.positions[trade.symbol] = Position(
trade.symbol, trade.quantity, trade.price, trade.timestamp
)
else: # sell
self.cash += trade.net_value
if trade.symbol in self.positions:
position = self.positions[trade.symbol]
position.quantity -= trade.quantity
if position.quantity <= 0:
del self.positions[trade.symbol]
[docs]
def update_prices(self, prices: Dict[str, float], timestamp: datetime):
"""Update current prices for all positions"""
for symbol, position in self.positions.items():
if symbol in prices:
position.current_price = prices[symbol]
# Record equity curve
total_equity = self.total_equity
self.equity_curve.append(total_equity)
self.timestamps.append(timestamp)
@property
def total_equity(self) -> float:
"""Calculate total portfolio equity"""
positions_value = sum(pos.market_value for pos in self.positions.values())
return self.cash + positions_value
@property
def total_return(self) -> float:
"""Calculate total return percentage"""
if self.initial_cash == 0:
return 0.0
return (self.total_equity - self.initial_cash) / self.initial_cash * 100
@property
def total_value(self) -> float:
"""Alias for total_equity for compatibility"""
return self.total_equity
[docs]
class DataFeed:
"""Base class for data feeds"""
[docs]
def __init__(self, symbols: List[str]):
self.symbols = symbols
self.data: Dict[str, pd.DataFrame] = {}
[docs]
def load_data(self, start_date: str, end_date: str):
"""Load data for the specified date range"""
raise NotImplementedError
[docs]
class YFinanceDataFeed(DataFeed):
"""Yahoo Finance data feed"""
[docs]
def load_data(self, start_date: str, end_date: str):
"""Load data from Yahoo Finance"""
for symbol in self.symbols:
try:
ticker = yf.Ticker(symbol)
data = ticker.history(start=start_date, end=end_date)
if not data.empty:
self.data[symbol] = data
else:
print(f"Warning: No data found for {symbol}")
except Exception as e:
print(f"Error loading data for {symbol}: {e}")
[docs]
def fetch_data(self, start_date: str, end_date: str) -> Dict[str, pd.DataFrame]:
"""Alias for load_data that returns the data directly"""
self.load_data(start_date, end_date)
return self.data
class BaseIndicator:
"""Base class for technical indicators"""
def __init__(self, period: int):
self.period = period
self.values: List[float] = []
def update(self, value: float):
"""Update indicator with new value"""
self.values.append(value)
if len(self.values) > self.period * 2: # Keep some history
self.values = self.values[-self.period * 2:]
@property
def value(self) -> Optional[float]:
"""Get current indicator value"""
if len(self.values) < self.period:
return None
return self._calculate()
def _calculate(self) -> float:
"""Calculate indicator value - to be implemented by subclasses"""
raise NotImplementedError
class SMA(BaseIndicator):
"""Simple Moving Average"""
def _calculate(self) -> float:
return np.mean(self.values[-self.period:])
class EMA(BaseIndicator):
"""Exponential Moving Average"""
def __init__(self, period: int):
super().__init__(period)
self.alpha = 2 / (period + 1)
self.ema_value = None
def _calculate(self) -> float:
if self.ema_value is None:
self.ema_value = np.mean(self.values[-self.period:])
else:
self.ema_value = self.alpha * self.values[-1] + (1 - self.alpha) * self.ema_value
return self.ema_value
class RSI(BaseIndicator):
"""Relative Strength Index"""
def _calculate(self) -> float:
if len(self.values) < self.period + 1:
return 50.0
deltas = np.diff(self.values[-self.period-1:])
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains)
avg_loss = np.mean(losses)
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
class MACD:
"""Moving Average Convergence Divergence"""
def __init__(self, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9):
self.fast_ema = EMA(fast_period)
self.slow_ema = EMA(slow_period)
self.signal_ema = EMA(signal_period)
self.macd_values: List[float] = []
def update(self, value: float):
self.fast_ema.update(value)
self.slow_ema.update(value)
if self.fast_ema.value is not None and self.slow_ema.value is not None:
macd_value = self.fast_ema.value - self.slow_ema.value
self.macd_values.append(macd_value)
self.signal_ema.update(macd_value)
@property
def macd(self) -> Optional[float]:
return self.macd_values[-1] if self.macd_values else None
@property
def signal(self) -> Optional[float]:
return self.signal_ema.value
@property
def histogram(self) -> Optional[float]:
if self.macd is not None and self.signal is not None:
return self.macd - self.signal
return None
class BollingerBands:
"""Bollinger Bands indicator"""
def __init__(self, period: int = 20, std_dev: float = 2.0):
self.period = period
self.std_dev = std_dev
self.values: List[float] = []
def update(self, value: float):
self.values.append(value)
if len(self.values) > self.period * 2:
self.values = self.values[-self.period * 2:]
@property
def middle_band(self) -> Optional[float]:
if len(self.values) < self.period:
return None
return np.mean(self.values[-self.period:])
@property
def upper_band(self) -> Optional[float]:
middle = self.middle_band
if middle is None or len(self.values) < self.period:
return None
std = np.std(self.values[-self.period:])
return middle + (self.std_dev * std)
@property
def lower_band(self) -> Optional[float]:
middle = self.middle_band
if middle is None or len(self.values) < self.period:
return None
std = np.std(self.values[-self.period:])
return middle - (self.std_dev * std)
# ============================================================================
# TREND INDICATORS
# ============================================================================
class WMA(BaseIndicator):
"""Weighted Moving Average"""
def _calculate(self) -> float:
weights = np.arange(1, self.period + 1)
return np.average(self.values[-self.period:], weights=weights)
class DEMA(BaseIndicator):
"""Double Exponential Moving Average"""
def __init__(self, period: int):
super().__init__(period)
self.ema1 = EMA(period)
self.ema2 = EMA(period)
def update(self, value: float):
super().update(value)
self.ema1.update(value)
if self.ema1.value is not None:
self.ema2.update(self.ema1.value)
def _calculate(self) -> float:
if self.ema1.value is None or self.ema2.value is None:
return np.mean(self.values[-self.period:])
return 2 * self.ema1.value - self.ema2.value
class TEMA(BaseIndicator):
"""Triple Exponential Moving Average"""
def __init__(self, period: int):
super().__init__(period)
self.ema1 = EMA(period)
self.ema2 = EMA(period)
self.ema3 = EMA(period)
def update(self, value: float):
super().update(value)
self.ema1.update(value)
if self.ema1.value is not None:
self.ema2.update(self.ema1.value)
if self.ema2.value is not None:
self.ema3.update(self.ema2.value)
def _calculate(self) -> float:
if None in [self.ema1.value, self.ema2.value, self.ema3.value]:
return np.mean(self.values[-self.period:])
return 3 * self.ema1.value - 3 * self.ema2.value + self.ema3.value
class KAMA(BaseIndicator):
"""Kaufman Adaptive Moving Average"""
def __init__(self, period: int = 10, fast_sc: int = 2, slow_sc: int = 30):
super().__init__(period)
self.fast_sc = 2 / (fast_sc + 1)
self.slow_sc = 2 / (slow_sc + 1)
self.kama_value = None
def _calculate(self) -> float:
if len(self.values) < self.period + 1:
return np.mean(self.values)
change = abs(self.values[-1] - self.values[-self.period])
volatility = sum(abs(self.values[i] - self.values[i-1]) for i in range(-self.period + 1, 0))
if volatility == 0:
er = 1
else:
er = change / volatility
sc = (er * (self.fast_sc - self.slow_sc) + self.slow_sc) ** 2
if self.kama_value is None:
self.kama_value = self.values[-1]
else:
self.kama_value = self.kama_value + sc * (self.values[-1] - self.kama_value)
return self.kama_value
class HullMA(BaseIndicator):
"""Hull Moving Average"""
def __init__(self, period: int):
super().__init__(period)
self.wma_half = WMA(period // 2)
self.wma_full = WMA(period)
self.wma_sqrt = WMA(int(np.sqrt(period)))
self.hull_values = []
def update(self, value: float):
super().update(value)
self.wma_half.update(value)
self.wma_full.update(value)
if self.wma_half.value is not None and self.wma_full.value is not None:
hull_value = 2 * self.wma_half.value - self.wma_full.value
self.hull_values.append(hull_value)
if len(self.hull_values) > int(np.sqrt(self.period)) * 2:
self.hull_values = self.hull_values[-int(np.sqrt(self.period)) * 2:]
self.wma_sqrt.update(hull_value)
def _calculate(self) -> float:
return self.wma_sqrt.value if self.wma_sqrt.value is not None else np.mean(self.values[-self.period:])
class VWAP:
"""Volume Weighted Average Price"""
def __init__(self):
self.price_volume = 0
self.volume_sum = 0
self.values = []
def update(self, price: float, volume: float):
self.price_volume += price * volume
self.volume_sum += volume
if self.volume_sum > 0:
self.values.append(self.price_volume / self.volume_sum)
else:
self.values.append(price)
@property
def value(self) -> Optional[float]:
return self.values[-1] if self.values else None
class ParabolicSAR:
"""Parabolic Stop and Reverse"""
def __init__(self, af_start: float = 0.02, af_increment: float = 0.02, af_max: float = 0.2):
self.af_start = af_start
self.af_increment = af_increment
self.af_max = af_max
self.sar = None
self.trend = 1 # 1 for up, -1 for down
self.af = af_start
self.ep = None # Extreme Point
self.highs = []
self.lows = []
def update(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
if len(self.highs) < 2:
self.sar = low
self.ep = high
return
prev_sar = self.sar
if self.trend == 1: # Uptrend
self.sar = prev_sar + self.af * (self.ep - prev_sar)
if low <= self.sar:
self.trend = -1
self.sar = self.ep
self.ep = low
self.af = self.af_start
else:
if high > self.ep:
self.ep = high
self.af = min(self.af + self.af_increment, self.af_max)
else: # Downtrend
self.sar = prev_sar - self.af * (prev_sar - self.ep)
if high >= self.sar:
self.trend = 1
self.sar = self.ep
self.ep = high
self.af = self.af_start
else:
if low < self.ep:
self.ep = low
self.af = min(self.af + self.af_increment, self.af_max)
@property
def value(self) -> Optional[float]:
return self.sar
# ============================================================================
# MOMENTUM INDICATORS
# ============================================================================
class Stochastic:
"""Stochastic Oscillator"""
def __init__(self, k_period: int = 14, d_period: int = 3):
self.k_period = k_period
self.d_period = d_period
self.highs = []
self.lows = []
self.closes = []
self.k_values = []
self.d_sma = SMA(d_period)
def update(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.closes.append(close)
if len(self.highs) > self.k_period * 2:
self.highs = self.highs[-self.k_period * 2:]
self.lows = self.lows[-self.k_period * 2:]
self.closes = self.closes[-self.k_period * 2:]
if len(self.closes) >= self.k_period:
highest_high = max(self.highs[-self.k_period:])
lowest_low = min(self.lows[-self.k_period:])
if highest_high - lowest_low != 0:
k_value = 100 * (close - lowest_low) / (highest_high - lowest_low)
else:
k_value = 50
self.k_values.append(k_value)
self.d_sma.update(k_value)
@property
def k(self) -> Optional[float]:
return self.k_values[-1] if self.k_values else None
@property
def d(self) -> Optional[float]:
return self.d_sma.value
class WilliamsR(BaseIndicator):
"""Williams %R"""
def __init__(self, period: int = 14):
super().__init__(period)
self.highs = []
self.lows = []
def update_hlc(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.values.append(close)
if len(self.values) > self.period * 2:
self.highs = self.highs[-self.period * 2:]
self.lows = self.lows[-self.period * 2:]
self.values = self.values[-self.period * 2:]
def _calculate(self) -> float:
if len(self.values) < self.period:
return -50
highest_high = max(self.highs[-self.period:])
lowest_low = min(self.lows[-self.period:])
close = self.values[-1]
if highest_high - lowest_low != 0:
return -100 * (highest_high - close) / (highest_high - lowest_low)
return -50
class CCI(BaseIndicator):
"""Commodity Channel Index"""
def __init__(self, period: int = 20):
super().__init__(period)
self.highs = []
self.lows = []
self.closes = []
self.typical_prices = []
def update_hlc(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.closes.append(close)
typical_price = (high + low + close) / 3
self.typical_prices.append(typical_price)
if len(self.typical_prices) > self.period * 2:
self.typical_prices = self.typical_prices[-self.period * 2:]
def _calculate(self) -> float:
if len(self.typical_prices) < self.period:
return 0
tp_sma = np.mean(self.typical_prices[-self.period:])
mean_deviation = np.mean([abs(tp - tp_sma) for tp in self.typical_prices[-self.period:]])
if mean_deviation == 0:
return 0
return (self.typical_prices[-1] - tp_sma) / (0.015 * mean_deviation)
class ROC(BaseIndicator):
"""Rate of Change"""
def _calculate(self) -> float:
if len(self.values) < self.period + 1:
return 0
return ((self.values[-1] - self.values[-self.period - 1]) / self.values[-self.period - 1]) * 100
class Momentum(BaseIndicator):
"""Momentum"""
def _calculate(self) -> float:
if len(self.values) < self.period + 1:
return 0
return self.values[-1] - self.values[-self.period - 1]
class StochasticRSI:
"""Stochastic RSI"""
def __init__(self, rsi_period: int = 14, stoch_period: int = 14):
self.rsi = RSI(rsi_period)
self.stoch_period = stoch_period
self.rsi_values = []
def update(self, value: float):
self.rsi.update(value)
if self.rsi.value is not None:
self.rsi_values.append(self.rsi.value)
if len(self.rsi_values) > self.stoch_period * 2:
self.rsi_values = self.rsi_values[-self.stoch_period * 2:]
@property
def value(self) -> Optional[float]:
if len(self.rsi_values) < self.stoch_period:
return None
rsi_high = max(self.rsi_values[-self.stoch_period:])
rsi_low = min(self.rsi_values[-self.stoch_period:])
current_rsi = self.rsi_values[-1]
if rsi_high - rsi_low != 0:
return 100 * (current_rsi - rsi_low) / (rsi_high - rsi_low)
return 50
class TRIX(BaseIndicator):
"""TRIX"""
def __init__(self, period: int = 14):
super().__init__(period)
self.ema1 = EMA(period)
self.ema2 = EMA(period)
self.ema3 = EMA(period)
self.trix_values = []
def update(self, value: float):
super().update(value)
self.ema1.update(value)
if self.ema1.value is not None:
self.ema2.update(self.ema1.value)
if self.ema2.value is not None:
self.ema3.update(self.ema2.value)
if len(self.trix_values) > 0:
trix = 10000 * (self.ema3.value - self.trix_values[-1]) / self.trix_values[-1]
self.trix_values.append(trix)
else:
self.trix_values.append(0)
def _calculate(self) -> float:
return self.trix_values[-1] if self.trix_values else 0
# ============================================================================
# VOLATILITY INDICATORS
# ============================================================================
class ATR:
"""Average True Range"""
def __init__(self, period: int = 14):
self.period = period
self.tr_values = []
self.atr_sma = SMA(period)
self.prev_close = None
def update(self, high: float, low: float, close: float):
if self.prev_close is not None:
tr1 = high - low
tr2 = abs(high - self.prev_close)
tr3 = abs(low - self.prev_close)
tr = max(tr1, tr2, tr3)
self.tr_values.append(tr)
self.atr_sma.update(tr)
self.prev_close = close
@property
def value(self) -> Optional[float]:
return self.atr_sma.value
class TrueRange:
"""True Range"""
def __init__(self):
self.prev_close = None
self.tr_value = None
def update(self, high: float, low: float, close: float):
if self.prev_close is not None:
tr1 = high - low
tr2 = abs(high - self.prev_close)
tr3 = abs(low - self.prev_close)
self.tr_value = max(tr1, tr2, tr3)
else:
self.tr_value = high - low
self.prev_close = close
@property
def value(self) -> Optional[float]:
return self.tr_value
class KeltnerChannels:
"""Keltner Channels"""
def __init__(self, period: int = 20, multiplier: float = 2.0):
self.ema = EMA(period)
self.atr = ATR(period)
self.multiplier = multiplier
def update(self, high: float, low: float, close: float):
self.ema.update(close)
self.atr.update(high, low, close)
@property
def middle(self) -> Optional[float]:
return self.ema.value
@property
def upper(self) -> Optional[float]:
if self.ema.value is None or self.atr.value is None:
return None
return self.ema.value + (self.multiplier * self.atr.value)
@property
def lower(self) -> Optional[float]:
if self.ema.value is None or self.atr.value is None:
return None
return self.ema.value - (self.multiplier * self.atr.value)
class DonchianChannels:
"""Donchian Channels"""
def __init__(self, period: int = 20):
self.period = period
self.highs = []
self.lows = []
def update(self, high: float, low: float):
self.highs.append(high)
self.lows.append(low)
if len(self.highs) > self.period * 2:
self.highs = self.highs[-self.period * 2:]
self.lows = self.lows[-self.period * 2:]
@property
def upper(self) -> Optional[float]:
if len(self.highs) < self.period:
return None
return max(self.highs[-self.period:])
@property
def lower(self) -> Optional[float]:
if len(self.lows) < self.period:
return None
return min(self.lows[-self.period:])
@property
def middle(self) -> Optional[float]:
if self.upper is None or self.lower is None:
return None
return (self.upper + self.lower) / 2
class ADX:
"""Average Directional Index"""
def __init__(self, period: int = 14):
self.period = period
self.atr = ATR(period)
self.plus_dm_sma = SMA(period)
self.minus_dm_sma = SMA(period)
self.dx_values = []
self.adx_sma = SMA(period)
self.prev_high = None
self.prev_low = None
def update(self, high: float, low: float, close: float):
self.atr.update(high, low, close)
if self.prev_high is not None and self.prev_low is not None:
plus_dm = max(high - self.prev_high, 0) if high - self.prev_high > self.prev_low - low else 0
minus_dm = max(self.prev_low - low, 0) if self.prev_low - low > high - self.prev_high else 0
self.plus_dm_sma.update(plus_dm)
self.minus_dm_sma.update(minus_dm)
if self.atr.value and self.atr.value > 0:
plus_di = 100 * self.plus_dm_sma.value / self.atr.value
minus_di = 100 * self.minus_dm_sma.value / self.atr.value
if plus_di + minus_di > 0:
dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
self.dx_values.append(dx)
self.adx_sma.update(dx)
self.prev_high = high
self.prev_low = low
@property
def value(self) -> Optional[float]:
return self.adx_sma.value
# ============================================================================
# VOLUME INDICATORS
# ============================================================================
class OBV:
"""On-Balance Volume"""
def __init__(self):
self.obv_value = 0
self.prev_close = None
def update(self, close: float, volume: float):
if self.prev_close is not None:
if close > self.prev_close:
self.obv_value += volume
elif close < self.prev_close:
self.obv_value -= volume
self.prev_close = close
@property
def value(self) -> float:
return self.obv_value
class AccumulationDistribution:
"""Accumulation/Distribution Line"""
def __init__(self):
self.ad_value = 0
def update(self, high: float, low: float, close: float, volume: float):
if high != low:
ad_multiplier = ((close - low) - (high - close)) / (high - low)
self.ad_value += ad_multiplier * volume
@property
def value(self) -> float:
return self.ad_value
class ChaikinMoneyFlow:
"""Chaikin Money Flow"""
def __init__(self, period: int = 20):
self.period = period
self.money_flow_multipliers = []
self.volumes = []
def update(self, high: float, low: float, close: float, volume: float):
if high != low:
mf_multiplier = ((close - low) - (high - close)) / (high - low)
else:
mf_multiplier = 0
self.money_flow_multipliers.append(mf_multiplier * volume)
self.volumes.append(volume)
if len(self.money_flow_multipliers) > self.period * 2:
self.money_flow_multipliers = self.money_flow_multipliers[-self.period * 2:]
self.volumes = self.volumes[-self.period * 2:]
@property
def value(self) -> Optional[float]:
if len(self.money_flow_multipliers) < self.period:
return None
sum_mf = sum(self.money_flow_multipliers[-self.period:])
sum_volume = sum(self.volumes[-self.period:])
return sum_mf / sum_volume if sum_volume > 0 else 0
class VROC(BaseIndicator):
"""Volume Rate of Change"""
def __init__(self, period: int = 25):
super().__init__(period)
self.volumes = []
def update_volume(self, volume: float):
self.volumes.append(volume)
if len(self.volumes) > self.period * 2:
self.volumes = self.volumes[-self.period * 2:]
def _calculate(self) -> float:
if len(self.volumes) < self.period + 1:
return 0
current_volume = self.volumes[-1]
past_volume = self.volumes[-self.period - 1]
if past_volume > 0:
return ((current_volume - past_volume) / past_volume) * 100
return 0
class ForceIndex:
"""Force Index"""
def __init__(self, period: int = 13):
self.ema = EMA(period)
self.prev_close = None
def update(self, close: float, volume: float):
if self.prev_close is not None:
fi = volume * (close - self.prev_close)
self.ema.update(fi)
self.prev_close = close
@property
def value(self) -> Optional[float]:
return self.ema.value
class VWMA(BaseIndicator):
"""Volume Weighted Moving Average"""
def __init__(self, period: int):
super().__init__(period)
self.volumes = []
def update_with_volume(self, price: float, volume: float):
self.values.append(price)
self.volumes.append(volume)
if len(self.values) > self.period * 2:
self.values = self.values[-self.period * 2:]
self.volumes = self.volumes[-self.period * 2:]
def _calculate(self) -> float:
if len(self.values) < self.period:
return np.mean(self.values)
prices = self.values[-self.period:]
volumes = self.volumes[-self.period:]
total_volume = sum(volumes)
if total_volume == 0:
return np.mean(prices)
return sum(p * v for p, v in zip(prices, volumes)) / total_volume
# ============================================================================
# STATISTICAL INDICATORS
# ============================================================================
class StandardDeviation(BaseIndicator):
"""Standard Deviation"""
def _calculate(self) -> float:
return np.std(self.values[-self.period:])
class Variance(BaseIndicator):
"""Variance"""
def _calculate(self) -> float:
return np.var(self.values[-self.period:])
class ZScore(BaseIndicator):
"""Z-Score"""
def _calculate(self) -> float:
if len(self.values) < self.period:
return 0
mean = np.mean(self.values[-self.period:])
std = np.std(self.values[-self.period:])
if std == 0:
return 0
return (self.values[-1] - mean) / std
class LinearRegression:
"""Linear Regression"""
def __init__(self, period: int):
self.period = period
self.values = []
def update(self, value: float):
self.values.append(value)
if len(self.values) > self.period * 2:
self.values = self.values[-self.period * 2:]
@property
def slope(self) -> Optional[float]:
if len(self.values) < self.period:
return None
y = np.array(self.values[-self.period:])
x = np.arange(len(y))
return np.polyfit(x, y, 1)[0]
@property
def intercept(self) -> Optional[float]:
if len(self.values) < self.period:
return None
y = np.array(self.values[-self.period:])
x = np.arange(len(y))
return np.polyfit(x, y, 1)[1]
@property
def value(self) -> Optional[float]:
if self.slope is None or self.intercept is None:
return None
return self.slope * (self.period - 1) + self.intercept
class Correlation:
"""Correlation between two series"""
def __init__(self, period: int):
self.period = period
self.x_values = []
self.y_values = []
def update(self, x: float, y: float):
self.x_values.append(x)
self.y_values.append(y)
if len(self.x_values) > self.period * 2:
self.x_values = self.x_values[-self.period * 2:]
self.y_values = self.y_values[-self.period * 2:]
@property
def value(self) -> Optional[float]:
if len(self.x_values) < self.period:
return None
x = np.array(self.x_values[-self.period:])
y = np.array(self.y_values[-self.period:])
return np.corrcoef(x, y)[0, 1]
# ============================================================================
# PATTERN INDICATORS
# ============================================================================
class PivotPoints:
"""Pivot Points"""
def __init__(self):
self.pivot = None
self.r1 = None
self.r2 = None
self.r3 = None
self.s1 = None
self.s2 = None
self.s3 = None
def update(self, high: float, low: float, close: float):
self.pivot = (high + low + close) / 3
self.r1 = 2 * self.pivot - low
self.s1 = 2 * self.pivot - high
self.r2 = self.pivot + (high - low)
self.s2 = self.pivot - (high - low)
self.r3 = high + 2 * (self.pivot - low)
self.s3 = low - 2 * (high - self.pivot)
class FibonacciRetracement:
"""Fibonacci Retracement"""
def __init__(self):
self.high = None
self.low = None
self.levels = [0.236, 0.382, 0.5, 0.618, 0.786]
def update(self, high: float, low: float):
if self.high is None or high > self.high:
self.high = high
if self.low is None or low < self.low:
self.low = low
def get_retracement_levels(self) -> Dict[str, float]:
if self.high is None or self.low is None:
return {}
diff = self.high - self.low
levels = {}
for level in self.levels:
levels[f"{level:.1%}"] = self.high - (diff * level)
return levels
class ZigZag:
"""ZigZag Indicator"""
def __init__(self, deviation: float = 5.0):
self.deviation = deviation / 100
self.highs = []
self.lows = []
self.last_pivot = None
self.last_pivot_type = None # 'high' or 'low'
self.current_trend = None
def update(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
if self.last_pivot is None:
self.last_pivot = close
return
if self.last_pivot_type == 'high' or self.last_pivot_type is None:
if low < self.last_pivot * (1 - self.deviation):
self.last_pivot = low
self.last_pivot_type = 'low'
self.current_trend = 'down'
if self.last_pivot_type == 'low' or self.last_pivot_type is None:
if high > self.last_pivot * (1 + self.deviation):
self.last_pivot = high
self.last_pivot_type = 'high'
self.current_trend = 'up'
@property
def trend(self) -> Optional[str]:
return self.current_trend
# ============================================================================
# ADVANCED MOMENTUM INDICATORS
# ============================================================================
class UltimateOscillator:
"""Ultimate Oscillator"""
def __init__(self, period1: int = 7, period2: int = 14, period3: int = 28):
self.period1 = period1
self.period2 = period2
self.period3 = period3
self.bp_values = [] # Buying Pressure
self.tr_values = [] # True Range
self.prev_close = None
def update(self, high: float, low: float, close: float):
if self.prev_close is not None:
bp = close - min(low, self.prev_close)
tr = max(high, self.prev_close) - min(low, self.prev_close)
self.bp_values.append(bp)
self.tr_values.append(tr)
if len(self.bp_values) > self.period3 * 2:
self.bp_values = self.bp_values[-self.period3 * 2:]
self.tr_values = self.tr_values[-self.period3 * 2:]
self.prev_close = close
@property
def value(self) -> Optional[float]:
if len(self.bp_values) < self.period3:
return None
avg1 = sum(self.bp_values[-self.period1:]) / sum(self.tr_values[-self.period1:]) if sum(self.tr_values[-self.period1:]) > 0 else 0
avg2 = sum(self.bp_values[-self.period2:]) / sum(self.tr_values[-self.period2:]) if sum(self.tr_values[-self.period2:]) > 0 else 0
avg3 = sum(self.bp_values[-self.period3:]) / sum(self.tr_values[-self.period3:]) if sum(self.tr_values[-self.period3:]) > 0 else 0
return 100 * (4 * avg1 + 2 * avg2 + avg3) / 7
class AwesomeOscillator:
"""Awesome Oscillator"""
def __init__(self, fast_period: int = 5, slow_period: int = 34):
self.fast_sma = SMA(fast_period)
self.slow_sma = SMA(slow_period)
self.hl2_values = []
def update(self, high: float, low: float):
hl2 = (high + low) / 2
self.hl2_values.append(hl2)
self.fast_sma.update(hl2)
self.slow_sma.update(hl2)
@property
def value(self) -> Optional[float]:
if self.fast_sma.value is None or self.slow_sma.value is None:
return None
return self.fast_sma.value - self.slow_sma.value
class WavesTrend:
"""WavesTrend Oscillator"""
def __init__(self, period1: int = 10, period2: int = 21):
self.period1 = period1
self.period2 = period2
self.esa = EMA(period1)
self.d_ema = EMA(period1)
self.ci_ema = EMA(period2)
self.tci_values = []
def update(self, high: float, low: float, close: float):
hlc3 = (high + low + close) / 3
self.esa.update(hlc3)
if self.esa.value is not None:
d = abs(hlc3 - self.esa.value)
self.d_ema.update(d)
if self.d_ema.value is not None and self.d_ema.value > 0:
ci = (hlc3 - self.esa.value) / (0.015 * self.d_ema.value)
self.ci_ema.update(ci)
self.tci_values.append(ci)
@property
def value(self) -> Optional[float]:
return self.ci_ema.value
class DeMarker:
"""DeMarker Indicator"""
def __init__(self, period: int = 14):
self.period = period
self.demax_values = []
self.demin_values = []
self.prev_high = None
self.prev_low = None
def update(self, high: float, low: float):
if self.prev_high is not None and self.prev_low is not None:
demax = max(high - self.prev_high, 0)
demin = max(self.prev_low - low, 0)
self.demax_values.append(demax)
self.demin_values.append(demin)
if len(self.demax_values) > self.period * 2:
self.demax_values = self.demax_values[-self.period * 2:]
self.demin_values = self.demin_values[-self.period * 2:]
self.prev_high = high
self.prev_low = low
@property
def value(self) -> Optional[float]:
if len(self.demax_values) < self.period:
return None
avg_demax = np.mean(self.demax_values[-self.period:])
avg_demin = np.mean(self.demin_values[-self.period:])
if avg_demax + avg_demin == 0:
return 0.5
return avg_demax / (avg_demax + avg_demin)
class AroonIndicator:
"""Aroon Indicator"""
def __init__(self, period: int = 25):
self.period = period
self.highs = []
self.lows = []
def update(self, high: float, low: float):
self.highs.append(high)
self.lows.append(low)
if len(self.highs) > self.period * 2:
self.highs = self.highs[-self.period * 2:]
self.lows = self.lows[-self.period * 2:]
@property
def aroon_up(self) -> Optional[float]:
if len(self.highs) < self.period:
return None
max_idx = np.argmax(self.highs[-self.period:])
return ((self.period - 1 - max_idx) / (self.period - 1)) * 100
@property
def aroon_down(self) -> Optional[float]:
if len(self.lows) < self.period:
return None
min_idx = np.argmin(self.lows[-self.period:])
return ((self.period - 1 - min_idx) / (self.period - 1)) * 100
@property
def aroon_oscillator(self) -> Optional[float]:
if self.aroon_up is None or self.aroon_down is None:
return None
return self.aroon_up - self.aroon_down
# ============================================================================
# ADDITIONAL TREND INDICATORS
# ============================================================================
class McGinleyDynamic(BaseIndicator):
"""McGinley Dynamic"""
def __init__(self, period: int = 10):
super().__init__(period)
self.md_value = None
self.k = 0.6
def _calculate(self) -> float:
if self.md_value is None:
self.md_value = np.mean(self.values)
if len(self.values) >= 2:
md_factor = self.values[-1] / self.md_value
n = self.period * (md_factor ** 4)
alpha = 2 / (n + 1)
self.md_value = self.md_value + alpha * (self.values[-1] - self.md_value)
return self.md_value
class VerticalHorizontalFilter(BaseIndicator):
"""Vertical Horizontal Filter"""
def _calculate(self) -> float:
if len(self.values) < self.period:
return 0
numerator = abs(self.values[-1] - self.values[-self.period])
denominator = sum(abs(self.values[i] - self.values[i-1])
for i in range(-self.period + 1, 0))
return numerator / denominator if denominator > 0 else 0
class SuperTrend:
"""SuperTrend Indicator"""
def __init__(self, period: int = 10, multiplier: float = 3.0):
self.period = period
self.multiplier = multiplier
self.atr = ATR(period)
self.basic_ub = []
self.basic_lb = []
self.final_ub = []
self.final_lb = []
self.supertrend = []
self.trend = []
def update(self, high: float, low: float, close: float):
hl2 = (high + low) / 2
self.atr.update(high, low, close)
if self.atr.value is not None:
basic_ub = hl2 + (self.multiplier * self.atr.value)
basic_lb = hl2 - (self.multiplier * self.atr.value)
self.basic_ub.append(basic_ub)
self.basic_lb.append(basic_lb)
# Calculate final bands
if len(self.final_ub) == 0:
final_ub = basic_ub
final_lb = basic_lb
else:
final_ub = basic_ub if basic_ub < self.final_ub[-1] or close > self.final_ub[-1] else self.final_ub[-1]
final_lb = basic_lb if basic_lb > self.final_lb[-1] or close < self.final_lb[-1] else self.final_lb[-1]
self.final_ub.append(final_ub)
self.final_lb.append(final_lb)
# Determine trend and SuperTrend
if len(self.supertrend) == 0:
if close <= final_ub:
supertrend = final_ub
trend = -1
else:
supertrend = final_lb
trend = 1
else:
prev_supertrend = self.supertrend[-1]
prev_trend = self.trend[-1]
if prev_trend == 1 and close > final_lb:
supertrend = final_lb
trend = 1
elif prev_trend == 1 and close <= final_lb:
supertrend = final_ub
trend = -1
elif prev_trend == -1 and close < final_ub:
supertrend = final_ub
trend = -1
else:
supertrend = final_lb
trend = 1
self.supertrend.append(supertrend)
self.trend.append(trend)
@property
def value(self) -> Optional[float]:
return self.supertrend[-1] if self.supertrend else None
@property
def trend_direction(self) -> Optional[int]:
return self.trend[-1] if self.trend else None
class AlmaIndicator(BaseIndicator):
"""Arnaud Legoux Moving Average (ALMA)"""
def __init__(self, period: int = 9, offset: float = 0.85, sigma: float = 6):
super().__init__(period)
self.offset = offset
self.sigma = sigma
self.weights = self._calculate_weights()
def _calculate_weights(self):
m = self.offset * (self.period - 1)
s = self.period / self.sigma
weights = []
for i in range(self.period):
weights.append(np.exp(-((i - m) ** 2) / (2 * s * s)))
weight_sum = sum(weights)
return [w / weight_sum for w in weights]
def _calculate(self) -> float:
if len(self.values) < self.period:
return np.mean(self.values)
return sum(self.values[i] * self.weights[i]
for i in range(-self.period, 0))
# ============================================================================
# VOLUME-PRICE INDICATORS
# ============================================================================
class MoneyFlowIndex:
"""Money Flow Index"""
def __init__(self, period: int = 14):
self.period = period
self.typical_prices = []
self.raw_money_flows = []
self.prev_tp = None
def update(self, high: float, low: float, close: float, volume: float):
tp = (high + low + close) / 3
raw_money_flow = tp * volume
self.typical_prices.append(tp)
self.raw_money_flows.append(raw_money_flow)
if len(self.typical_prices) > self.period * 2:
self.typical_prices = self.typical_prices[-self.period * 2:]
self.raw_money_flows = self.raw_money_flows[-self.period * 2:]
self.prev_tp = tp
@property
def value(self) -> Optional[float]:
if len(self.typical_prices) < self.period + 1:
return None
positive_flow = 0
negative_flow = 0
for i in range(-self.period, 0):
if i == -self.period:
continue
if self.typical_prices[i] > self.typical_prices[i-1]:
positive_flow += self.raw_money_flows[i]
elif self.typical_prices[i] < self.typical_prices[i-1]:
negative_flow += self.raw_money_flows[i]
if negative_flow == 0:
return 100
money_ratio = positive_flow / negative_flow
return 100 - (100 / (1 + money_ratio))
class VolumeOscillator:
"""Volume Oscillator"""
def __init__(self, fast_period: int = 5, slow_period: int = 10):
self.fast_sma = SMA(fast_period)
self.slow_sma = SMA(slow_period)
def update(self, volume: float):
self.fast_sma.update(volume)
self.slow_sma.update(volume)
@property
def value(self) -> Optional[float]:
if self.fast_sma.value is None or self.slow_sma.value is None or self.slow_sma.value == 0:
return None
return ((self.fast_sma.value - self.slow_sma.value) / self.slow_sma.value) * 100
class EaseOfMovement:
"""Ease of Movement"""
def __init__(self, period: int = 14):
self.period = period
self.eom_values = []
self.sma = SMA(period)
self.prev_high = None
self.prev_low = None
def update(self, high: float, low: float, volume: float):
if self.prev_high is not None and self.prev_low is not None:
distance_moved = ((high + low) / 2) - ((self.prev_high + self.prev_low) / 2)
scale = volume / (high - low) if high != low else 0
eom = distance_moved / scale if scale != 0 else 0
self.eom_values.append(eom)
self.sma.update(eom)
self.prev_high = high
self.prev_low = low
@property
def value(self) -> Optional[float]:
return self.sma.value
class NegativeVolumeIndex:
"""Negative Volume Index"""
def __init__(self):
self.nvi = 100
self.prev_close = None
self.prev_volume = None
def update(self, close: float, volume: float):
if self.prev_close is not None and self.prev_volume is not None:
if volume < self.prev_volume:
self.nvi = self.nvi * (close / self.prev_close)
self.prev_close = close
self.prev_volume = volume
@property
def value(self) -> float:
return self.nvi
class PositiveVolumeIndex:
"""Positive Volume Index"""
def __init__(self):
self.pvi = 100
self.prev_close = None
self.prev_volume = None
def update(self, close: float, volume: float):
if self.prev_close is not None and self.prev_volume is not None:
if volume > self.prev_volume:
self.pvi = self.pvi * (close / self.prev_close)
self.prev_close = close
self.prev_volume = volume
@property
def value(self) -> float:
return self.pvi
# ============================================================================
# VOLATILITY AND BANDS
# ============================================================================
class StandardError(BaseIndicator):
"""Standard Error"""
def _calculate(self) -> float:
if len(self.values) < 2:
return 0
return np.std(self.values[-self.period:]) / np.sqrt(self.period)
class MeanDeviation(BaseIndicator):
"""Mean Deviation"""
def _calculate(self) -> float:
mean = np.mean(self.values[-self.period:])
return np.mean([abs(x - mean) for x in self.values[-self.period:]])
class ChoppinessIndex(BaseIndicator):
"""Choppiness Index"""
def __init__(self, period: int = 14):
super().__init__(period)
self.highs = []
self.lows = []
self.atr_values = []
def update_hlc(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.values.append(close)
if len(self.values) >= 2:
tr = max(high - low,
abs(high - self.values[-2]),
abs(low - self.values[-2]))
self.atr_values.append(tr)
if len(self.highs) > self.period * 2:
self.highs = self.highs[-self.period * 2:]
self.lows = self.lows[-self.period * 2:]
self.atr_values = self.atr_values[-self.period * 2:]
def _calculate(self) -> float:
if len(self.highs) < self.period or len(self.atr_values) < self.period:
return 50
highest_high = max(self.highs[-self.period:])
lowest_low = min(self.lows[-self.period:])
atr_sum = sum(self.atr_values[-self.period:])
if highest_high - lowest_low == 0:
return 50
ci = 100 * np.log10(atr_sum / (highest_high - lowest_low)) / np.log10(self.period)
return ci
class RelativeVolatilityIndex:
"""Relative Volatility Index"""
def __init__(self, period: int = 10):
self.period = period
self.price_changes = []
self.up_changes = []
self.down_changes = []
self.up_rsi = RSI(period)
self.down_rsi = RSI(period)
self.prev_price = None
def update(self, price: float):
if self.prev_price is not None:
std_up = 0
std_down = 0
if len(self.price_changes) >= 9: # Need at least 10 prices for std
recent_prices = [self.prev_price] + self.price_changes[-8:]
std_dev = np.std(recent_prices)
if price > self.prev_price:
std_up = std_dev
else:
std_down = std_dev
self.up_rsi.update(std_up)
self.down_rsi.update(std_down)
self.price_changes.append(price)
self.prev_price = price
@property
def value(self) -> Optional[float]:
if self.up_rsi.value is None or self.down_rsi.value is None:
return None
return (self.up_rsi.value + self.down_rsi.value) / 2
# ============================================================================
# MARKET STRUCTURE INDICATORS
# ============================================================================
class SwingIndex:
"""Swing Index"""
def __init__(self):
self.prev_ohlc = None
self.si_values = []
def update(self, open_price: float, high: float, low: float, close: float):
if self.prev_ohlc is not None:
po, ph, pl, pc = self.prev_ohlc
# Calculate A, B, C components
a = abs(high - pc)
b = abs(low - pc)
c = abs(high - low)
# Calculate K
if a > b and a > c:
k = a - 0.5 * b + 0.25 * (pc - po)
elif b > a and b > c:
k = b - 0.5 * a + 0.25 * (pc - po)
else:
k = c + 0.25 * (pc - po)
# Calculate R
r = max(abs(high - pc), abs(low - pc))
# Calculate Swing Index
if k != 0 and r != 0:
si = 50 * ((close - pc + 0.5 * (close - open_price) + 0.25 * (pc - po)) / k) * (200 / r)
else:
si = 0
self.si_values.append(si)
self.prev_ohlc = (open_price, high, low, close)
@property
def value(self) -> Optional[float]:
return self.si_values[-1] if self.si_values else None
class AccumulativeSwingIndex:
"""Accumulative Swing Index"""
def __init__(self):
self.swing_index = SwingIndex()
self.asi_value = 0
def update(self, open_price: float, high: float, low: float, close: float):
self.swing_index.update(open_price, high, low, close)
if self.swing_index.value is not None:
self.asi_value += self.swing_index.value
@property
def value(self) -> float:
return self.asi_value
class FractalIndicator:
"""Fractal Indicator"""
def __init__(self, period: int = 5):
self.period = period
self.highs = []
self.lows = []
self.bull_fractals = []
self.bear_fractals = []
def update(self, high: float, low: float):
self.highs.append(high)
self.lows.append(low)
if len(self.highs) >= self.period:
middle = self.period // 2
# Check for bullish fractal (high point)
is_bull_fractal = True
for i in range(self.period):
if i != middle and self.highs[-self.period + i] >= self.highs[-self.period + middle]:
is_bull_fractal = False
break
# Check for bearish fractal (low point)
is_bear_fractal = True
for i in range(self.period):
if i != middle and self.lows[-self.period + i] <= self.lows[-self.period + middle]:
is_bear_fractal = False
break
self.bull_fractals.append(is_bull_fractal)
self.bear_fractals.append(is_bear_fractal)
else:
self.bull_fractals.append(False)
self.bear_fractals.append(False)
@property
def is_bull_fractal(self) -> bool:
return self.bull_fractals[-1] if self.bull_fractals else False
@property
def is_bear_fractal(self) -> bool:
return self.bear_fractals[-1] if self.bear_fractals else False
# ============================================================================
# ADVANCED MATHEMATICAL INDICATORS
# ============================================================================
class HilbertTransform:
"""Hilbert Transform - Dominant Cycle Period"""
def __init__(self):
self.prices = []
self.smooth_prices = []
self.detrender = []
self.i1 = []
self.q1 = []
self.ji = []
self.jq = []
self.i2 = []
self.q2 = []
self.re = []
self.im = []
self.period = []
self.smooth_period = []
def update(self, price: float):
self.prices.append(price)
if len(self.prices) < 7:
self.smooth_prices.append(price)
self.detrender.append(0)
self.i1.append(0)
self.q1.append(0)
self.ji.append(0)
self.jq.append(0)
self.i2.append(0)
self.q2.append(0)
self.re.append(0)
self.im.append(0)
self.period.append(15)
self.smooth_period.append(15)
return
# Smooth the price
smooth = (4*price + 3*self.prices[-2] + 2*self.prices[-3] + self.prices[-4]) / 10
self.smooth_prices.append(smooth)
# Detrender
detrender_val = (0.0962*smooth + 0.5769*self.smooth_prices[-2]
- 0.5769*self.smooth_prices[-4] - 0.0962*self.smooth_prices[-6])
self.detrender.append(detrender_val)
# Compute InPhase and Quadrature components
i1_val = self.detrender[-4] if len(self.detrender) >= 4 else 0
q1_val = (0.0962*detrender_val + 0.5769*self.detrender[-2]
- 0.5769*self.detrender[-4] - 0.0962*self.detrender[-6]) if len(self.detrender) >= 6 else 0
self.i1.append(i1_val)
self.q1.append(q1_val)
# Advance the phase by 90 degrees
ji_val = (0.0962*i1_val + 0.5769*self.i1[-2]
- 0.5769*self.i1[-4] - 0.0962*self.i1[-6]) if len(self.i1) >= 6 else 0
jq_val = (0.0962*q1_val + 0.5769*self.q1[-2]
- 0.5769*self.q1[-4] - 0.0962*self.q1[-6]) if len(self.q1) >= 6 else 0
self.ji.append(ji_val)
self.jq.append(jq_val)
# Phasor addition
i2_val = i1_val - jq_val
q2_val = q1_val + ji_val
# Smooth the I and Q components
i2_val = 0.2*i2_val + 0.8*self.i2[-1] if self.i2 else i2_val
q2_val = 0.2*q2_val + 0.8*self.q2[-1] if self.q2 else q2_val
self.i2.append(i2_val)
self.q2.append(q2_val)
# Homodyne Discriminator
re_val = i2_val*self.i2[-1] + q2_val*self.q2[-1] if len(self.i2) >= 2 else 0
im_val = i2_val*self.q2[-1] - q2_val*self.i2[-1] if len(self.i2) >= 2 else 0
re_val = 0.2*re_val + 0.8*self.re[-1] if self.re else re_val
im_val = 0.2*im_val + 0.8*self.im[-1] if self.im else im_val
self.re.append(re_val)
self.im.append(im_val)
# Compute the period
if im_val != 0 and re_val != 0:
period_val = 2*np.pi / np.arctan(im_val/re_val)
else:
period_val = self.period[-1] if self.period else 15
if period_val > 1.5*self.period[-1] if self.period else False:
period_val = 1.5*self.period[-1]
elif period_val < 0.67*self.period[-1] if self.period else False:
period_val = 0.67*self.period[-1]
if period_val < 6:
period_val = 6
elif period_val > 50:
period_val = 50
period_val = 0.2*period_val + 0.8*self.period[-1] if self.period else period_val
self.period.append(period_val)
self.smooth_period.append(0.33*period_val + 0.67*self.smooth_period[-1] if self.smooth_period else period_val)
@property
def dominant_cycle(self) -> Optional[float]:
return self.smooth_period[-1] if self.smooth_period else None
class ChandeMomentumOscillator(BaseIndicator):
"""Chande Momentum Oscillator"""
def _calculate(self) -> float:
if len(self.values) < 2:
return 0
gains = 0
losses = 0
for i in range(-self.period, 0):
if i >= -len(self.values) + 1:
change = self.values[i] - self.values[i-1]
if change > 0:
gains += change
else:
losses -= change
if gains + losses == 0:
return 0
return 100 * (gains - losses) / (gains + losses)
class KnowSureThing:
"""Know Sure Thing (KST)"""
def __init__(self, roc1: int = 10, roc2: int = 15, roc3: int = 20, roc4: int = 30,
sma1: int = 10, sma2: int = 10, sma3: int = 10, sma4: int = 15, signal: int = 9):
self.roc1 = ROC(roc1)
self.roc2 = ROC(roc2)
self.roc3 = ROC(roc3)
self.roc4 = ROC(roc4)
self.sma1 = SMA(sma1)
self.sma2 = SMA(sma2)
self.sma3 = SMA(sma3)
self.sma4 = SMA(sma4)
self.signal_sma = SMA(signal)
self.kst_values = []
def update(self, price: float):
self.roc1.update(price)
self.roc2.update(price)
self.roc3.update(price)
self.roc4.update(price)
if self.roc1.value is not None:
self.sma1.update(self.roc1.value)
if self.roc2.value is not None:
self.sma2.update(self.roc2.value)
if self.roc3.value is not None:
self.sma3.update(self.roc3.value)
if self.roc4.value is not None:
self.sma4.update(self.roc4.value)
if all(sma.value is not None for sma in [self.sma1, self.sma2, self.sma3, self.sma4]):
kst = (1*self.sma1.value + 2*self.sma2.value + 3*self.sma3.value + 4*self.sma4.value)
self.kst_values.append(kst)
self.signal_sma.update(kst)
@property
def kst(self) -> Optional[float]:
return self.kst_values[-1] if self.kst_values else None
@property
def signal(self) -> Optional[float]:
return self.signal_sma.value
# ============================================================================
# ADDITIONAL OSCILLATORS AND INDICATORS
# ============================================================================
class PercentagePriceOscillator:
"""Percentage Price Oscillator (PPO)"""
def __init__(self, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9):
self.fast_ema = EMA(fast_period)
self.slow_ema = EMA(slow_period)
self.signal_ema = EMA(signal_period)
self.ppo_values = []
def update(self, price: float):
self.fast_ema.update(price)
self.slow_ema.update(price)
if self.fast_ema.value is not None and self.slow_ema.value is not None and self.slow_ema.value != 0:
ppo = ((self.fast_ema.value - self.slow_ema.value) / self.slow_ema.value) * 100
self.ppo_values.append(ppo)
self.signal_ema.update(ppo)
@property
def ppo(self) -> Optional[float]:
return self.ppo_values[-1] if self.ppo_values else None
@property
def signal(self) -> Optional[float]:
return self.signal_ema.value
@property
def histogram(self) -> Optional[float]:
if self.ppo is not None and self.signal is not None:
return self.ppo - self.signal
return None
class DetrendedPriceOscillator(BaseIndicator):
"""Detrended Price Oscillator (DPO)"""
def _calculate(self) -> float:
if len(self.values) < self.period:
return 0
sma = np.mean(self.values[-self.period:])
lookback_index = self.period // 2 + 1
if len(self.values) >= lookback_index:
return self.values[-lookback_index] - sma
return 0
class PriceOscillator:
"""Price Oscillator"""
def __init__(self, fast_period: int = 10, slow_period: int = 20):
self.fast_sma = SMA(fast_period)
self.slow_sma = SMA(slow_period)
def update(self, price: float):
self.fast_sma.update(price)
self.slow_sma.update(price)
@property
def value(self) -> Optional[float]:
if self.fast_sma.value is None or self.slow_sma.value is None:
return None
return self.fast_sma.value - self.slow_sma.value
class SchaffTrendCycle:
"""Schaff Trend Cycle"""
def __init__(self, cycle_period: int = 10, fast_period: int = 23, slow_period: int = 50):
self.cycle_period = cycle_period
self.fast_ema = EMA(fast_period)
self.slow_ema = EMA(slow_period)
self.macd_values = []
self.k_values = []
self.d_values = []
self.stc_values = []
def update(self, price: float):
self.fast_ema.update(price)
self.slow_ema.update(price)
if self.fast_ema.value is not None and self.slow_ema.value is not None:
macd = self.fast_ema.value - self.slow_ema.value
self.macd_values.append(macd)
if len(self.macd_values) >= self.cycle_period:
# First stochastic calculation on MACD
macd_low = min(self.macd_values[-self.cycle_period:])
macd_high = max(self.macd_values[-self.cycle_period:])
if macd_high - macd_low != 0:
k = 100 * (macd - macd_low) / (macd_high - macd_low)
else:
k = 50
self.k_values.append(k)
if len(self.k_values) >= 3:
d = np.mean(self.k_values[-3:])
self.d_values.append(d)
if len(self.d_values) >= self.cycle_period:
# Second stochastic calculation on %D
d_low = min(self.d_values[-self.cycle_period:])
d_high = max(self.d_values[-self.cycle_period:])
if d_high - d_low != 0:
stc = 100 * (d - d_low) / (d_high - d_low)
else:
stc = 50
self.stc_values.append(stc)
@property
def value(self) -> Optional[float]:
return self.stc_values[-1] if self.stc_values else None
class ElderRayIndex:
"""Elder Ray Index (Bull Power and Bear Power)"""
def __init__(self, period: int = 13):
self.ema = EMA(period)
self.bull_power = []
self.bear_power = []
def update(self, high: float, low: float, close: float):
self.ema.update(close)
if self.ema.value is not None:
bull = high - self.ema.value
bear = low - self.ema.value
self.bull_power.append(bull)
self.bear_power.append(bear)
@property
def bull_power_value(self) -> Optional[float]:
return self.bull_power[-1] if self.bull_power else None
@property
def bear_power_value(self) -> Optional[float]:
return self.bear_power[-1] if self.bear_power else None
class KaufmanEfficiencyRatio(BaseIndicator):
"""Kaufman Efficiency Ratio"""
def _calculate(self) -> float:
if len(self.values) < self.period + 1:
return 0
direction = abs(self.values[-1] - self.values[-self.period-1])
volatility = sum(abs(self.values[i] - self.values[i-1])
for i in range(-self.period, 0))
return direction / volatility if volatility > 0 else 0
class RelativeVigorIndex:
"""Relative Vigor Index"""
def __init__(self, period: int = 10):
self.period = period
self.closes = []
self.opens = []
self.highs = []
self.lows = []
self.rvi_values = []
self.signal_values = []
def update(self, open_price: float, high: float, low: float, close: float):
self.opens.append(open_price)
self.highs.append(high)
self.lows.append(low)
self.closes.append(close)
if len(self.closes) > self.period * 2:
self.opens = self.opens[-self.period * 2:]
self.highs = self.highs[-self.period * 2:]
self.lows = self.lows[-self.period * 2:]
self.closes = self.closes[-self.period * 2:]
if len(self.closes) >= self.period:
numerator = sum((self.closes[i] - self.opens[i])
for i in range(-self.period, 0))
denominator = sum((self.highs[i] - self.lows[i])
for i in range(-self.period, 0))
if denominator != 0:
rvi = numerator / denominator
self.rvi_values.append(rvi)
# Signal line is 4-period SMA of RVI
if len(self.rvi_values) >= 4:
signal = np.mean(self.rvi_values[-4:])
self.signal_values.append(signal)
@property
def rvi(self) -> Optional[float]:
return self.rvi_values[-1] if self.rvi_values else None
@property
def signal(self) -> Optional[float]:
return self.signal_values[-1] if self.signal_values else None
class MarketFacilitationIndex:
"""Market Facilitation Index"""
def __init__(self):
self.mfi_values = []
def update(self, high: float, low: float, volume: float):
if volume > 0:
mfi = (high - low) / volume
else:
mfi = 0
self.mfi_values.append(mfi)
@property
def value(self) -> Optional[float]:
return self.mfi_values[-1] if self.mfi_values else None
class IchimokuKinkoHyo:
"""Ichimoku Kinko Hyo"""
def __init__(self, tenkan_period: int = 9, kijun_period: int = 26,
senkou_b_period: int = 52, displacement: int = 26):
self.tenkan_period = tenkan_period
self.kijun_period = kijun_period
self.senkou_b_period = senkou_b_period
self.displacement = displacement
self.highs = []
self.lows = []
self.closes = []
self.tenkan_sen = []
self.kijun_sen = []
self.senkou_span_a = []
self.senkou_span_b = []
self.chikou_span = []
def update(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.closes.append(close)
# Tenkan-sen (Conversion Line)
if len(self.highs) >= self.tenkan_period:
tenkan_high = max(self.highs[-self.tenkan_period:])
tenkan_low = min(self.lows[-self.tenkan_period:])
tenkan = (tenkan_high + tenkan_low) / 2
self.tenkan_sen.append(tenkan)
else:
self.tenkan_sen.append(None)
# Kijun-sen (Base Line)
if len(self.highs) >= self.kijun_period:
kijun_high = max(self.highs[-self.kijun_period:])
kijun_low = min(self.lows[-self.kijun_period:])
kijun = (kijun_high + kijun_low) / 2
self.kijun_sen.append(kijun)
else:
self.kijun_sen.append(None)
# Senkou Span A (Leading Span A)
if len(self.tenkan_sen) > 0 and len(self.kijun_sen) > 0:
if self.tenkan_sen[-1] is not None and self.kijun_sen[-1] is not None:
senkou_a = (self.tenkan_sen[-1] + self.kijun_sen[-1]) / 2
self.senkou_span_a.append(senkou_a)
else:
self.senkou_span_a.append(None)
else:
self.senkou_span_a.append(None)
# Senkou Span B (Leading Span B)
if len(self.highs) >= self.senkou_b_period:
senkou_b_high = max(self.highs[-self.senkou_b_period:])
senkou_b_low = min(self.lows[-self.senkou_b_period:])
senkou_b = (senkou_b_high + senkou_b_low) / 2
self.senkou_span_b.append(senkou_b)
else:
self.senkou_span_b.append(None)
# Chikou Span (Lagging Span)
self.chikou_span.append(close)
@property
def tenkan(self) -> Optional[float]:
return self.tenkan_sen[-1] if self.tenkan_sen else None
@property
def kijun(self) -> Optional[float]:
return self.kijun_sen[-1] if self.kijun_sen else None
@property
def senkou_a(self) -> Optional[float]:
return self.senkou_span_a[-1] if self.senkou_span_a else None
@property
def senkou_b(self) -> Optional[float]:
return self.senkou_span_b[-1] if self.senkou_span_b else None
@property
def chikou(self) -> Optional[float]:
return self.chikou_span[-1] if self.chikou_span else None
class PVT:
"""Price Volume Trend"""
def __init__(self):
self.pvt_value = 0
self.prev_close = None
def update(self, close: float, volume: float):
if self.prev_close is not None and self.prev_close != 0:
pvt_change = volume * ((close - self.prev_close) / self.prev_close)
self.pvt_value += pvt_change
self.prev_close = close
@property
def value(self) -> float:
return self.pvt_value
class TypicalPrice(BaseIndicator):
"""Typical Price (HLC/3)"""
def __init__(self):
super().__init__(1)
self.highs = []
self.lows = []
def update_hlc(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.values.append(close)
def _calculate(self) -> float:
if not self.highs or not self.lows or not self.values:
return 0
return (self.highs[-1] + self.lows[-1] + self.values[-1]) / 3
class WeightedClose(BaseIndicator):
"""Weighted Close (HLCC/4)"""
def __init__(self):
super().__init__(1)
self.highs = []
self.lows = []
def update_hlc(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.values.append(close)
def _calculate(self) -> float:
if not self.highs or not self.lows or not self.values:
return 0
return (self.highs[-1] + self.lows[-1] + 2 * self.values[-1]) / 4
class MedianPrice(BaseIndicator):
"""Median Price (HL/2)"""
def __init__(self):
super().__init__(1)
self.highs = []
self.lows = []
def update_hl(self, high: float, low: float):
self.highs.append(high)
self.lows.append(low)
@property
def value(self) -> Optional[float]:
if not self.highs or not self.lows:
return None
return (self.highs[-1] + self.lows[-1]) / 2
class AbsolutePriceOscillator:
"""Absolute Price Oscillator (APO)"""
def __init__(self, fast_period: int = 12, slow_period: int = 26):
self.fast_ema = EMA(fast_period)
self.slow_ema = EMA(slow_period)
def update(self, price: float):
self.fast_ema.update(price)
self.slow_ema.update(price)
@property
def value(self) -> Optional[float]:
if self.fast_ema.value is None or self.slow_ema.value is None:
return None
return self.fast_ema.value - self.slow_ema.value
class BalanceOfPower:
"""Balance of Power"""
def __init__(self):
self.bop_values = []
def update(self, open_price: float, high: float, low: float, close: float):
if high != low:
bop = (close - open_price) / (high - low)
else:
bop = 0
self.bop_values.append(bop)
@property
def value(self) -> Optional[float]:
return self.bop_values[-1] if self.bop_values else None
class CooppockCurve:
"""Coppock Curve"""
def __init__(self, wma_period: int = 10, roc1_period: int = 14, roc2_period: int = 11):
self.roc1 = ROC(roc1_period)
self.roc2 = ROC(roc2_period)
self.wma = WMA(wma_period)
self.roc_sum_values = []
def update(self, price: float):
self.roc1.update(price)
self.roc2.update(price)
if self.roc1.value is not None and self.roc2.value is not None:
roc_sum = self.roc1.value + self.roc2.value
self.roc_sum_values.append(roc_sum)
self.wma.update(roc_sum)
@property
def value(self) -> Optional[float]:
return self.wma.value
class RainbowOscillator:
"""Rainbow Oscillator"""
def __init__(self, period: int = 2):
self.period = period
self.sma_levels = [SMA(period) for _ in range(10)]
self.rb_values = []
def update(self, price: float):
# First level
self.sma_levels[0].update(price)
# Subsequent levels
for i in range(1, 10):
if self.sma_levels[i-1].value is not None:
self.sma_levels[i].update(self.sma_levels[i-1].value)
# Calculate Rainbow Oscillator
if self.sma_levels[0].value is not None and self.sma_levels[9].value is not None:
if self.sma_levels[9].value != 0:
rb = 100 * (price - self.sma_levels[9].value) / self.sma_levels[9].value
self.rb_values.append(rb)
@property
def value(self) -> Optional[float]:
return self.rb_values[-1] if self.rb_values else None
class KeltnerBands:
"""Keltner Bands (alternative implementation)"""
def __init__(self, period: int = 20, multiplier: float = 1.5):
self.ema = EMA(period)
self.sma_tr = SMA(period)
self.multiplier = multiplier
self.prev_close = None
def update(self, high: float, low: float, close: float):
self.ema.update(close)
if self.prev_close is not None:
tr = max(high - low, abs(high - self.prev_close), abs(low - self.prev_close))
self.sma_tr.update(tr)
self.prev_close = close
@property
def middle(self) -> Optional[float]:
return self.ema.value
@property
def upper(self) -> Optional[float]:
if self.ema.value is None or self.sma_tr.value is None:
return None
return self.ema.value + (self.multiplier * self.sma_tr.value)
@property
def lower(self) -> Optional[float]:
if self.ema.value is None or self.sma_tr.value is None:
return None
return self.ema.value - (self.multiplier * self.sma_tr.value)
class StochasticMomentumIndex:
"""Stochastic Momentum Index"""
def __init__(self, k_period: int = 10, d_period: int = 3):
self.k_period = k_period
self.d_period = d_period
self.highs = []
self.lows = []
self.closes = []
self.smi_values = []
self.signal_sma = SMA(d_period)
def update(self, high: float, low: float, close: float):
self.highs.append(high)
self.lows.append(low)
self.closes.append(close)
if len(self.closes) > self.k_period * 2:
self.highs = self.highs[-self.k_period * 2:]
self.lows = self.lows[-self.k_period * 2:]
self.closes = self.closes[-self.k_period * 2:]
if len(self.closes) >= self.k_period:
highest_high = max(self.highs[-self.k_period:])
lowest_low = min(self.lows[-self.k_period:])
if highest_high != lowest_low:
hl_range = highest_high - lowest_low
close_position = close - (highest_high + lowest_low) / 2
smi = 100 * (close_position / (hl_range / 2))
else:
smi = 0
self.smi_values.append(smi)
self.signal_sma.update(smi)
@property
def smi(self) -> Optional[float]:
return self.smi_values[-1] if self.smi_values else None
@property
def signal(self) -> Optional[float]:
return self.signal_sma.value
# Indicators module
[docs]
class indicators:
# Basic indicators (5)
SMA = SMA
EMA = EMA
RSI = RSI
MACD = MACD
BollingerBands = BollingerBands
# Trend indicators (11)
WMA = WMA
DEMA = DEMA
TEMA = TEMA
KAMA = KAMA
HullMA = HullMA
VWAP = VWAP
ParabolicSAR = ParabolicSAR
McGinleyDynamic = McGinleyDynamic
VerticalHorizontalFilter = VerticalHorizontalFilter
SuperTrend = SuperTrend
AlmaIndicator = AlmaIndicator
# Momentum indicators (25)
Stochastic = Stochastic
WilliamsR = WilliamsR
CCI = CCI
ROC = ROC
Momentum = Momentum
StochasticRSI = StochasticRSI
TRIX = TRIX
UltimateOscillator = UltimateOscillator
AwesomeOscillator = AwesomeOscillator
WavesTrend = WavesTrend
DeMarker = DeMarker
AroonIndicator = AroonIndicator
ChandeMomentumOscillator = ChandeMomentumOscillator
KnowSureThing = KnowSureThing
PercentagePriceOscillator = PercentagePriceOscillator
DetrendedPriceOscillator = DetrendedPriceOscillator
PriceOscillator = PriceOscillator
SchaffTrendCycle = SchaffTrendCycle
ElderRayIndex = ElderRayIndex
KaufmanEfficiencyRatio = KaufmanEfficiencyRatio
RelativeVigorIndex = RelativeVigorIndex
AbsolutePriceOscillator = AbsolutePriceOscillator
CooppockCurve = CooppockCurve
RainbowOscillator = RainbowOscillator
StochasticMomentumIndex = StochasticMomentumIndex
# Volatility indicators (11)
ATR = ATR
TrueRange = TrueRange
KeltnerChannels = KeltnerChannels
DonchianChannels = DonchianChannels
ADX = ADX
StandardError = StandardError
MeanDeviation = MeanDeviation
ChoppinessIndex = ChoppinessIndex
RelativeVolatilityIndex = RelativeVolatilityIndex
KeltnerBands = KeltnerBands
# Volume indicators (13)
OBV = OBV
AccumulationDistribution = AccumulationDistribution
ChaikinMoneyFlow = ChaikinMoneyFlow
VROC = VROC
ForceIndex = ForceIndex
VWMA = VWMA
MoneyFlowIndex = MoneyFlowIndex
VolumeOscillator = VolumeOscillator
EaseOfMovement = EaseOfMovement
NegativeVolumeIndex = NegativeVolumeIndex
PositiveVolumeIndex = PositiveVolumeIndex
MarketFacilitationIndex = MarketFacilitationIndex
PVT = PVT
# Statistical indicators (5)
StandardDeviation = StandardDeviation
Variance = Variance
ZScore = ZScore
LinearRegression = LinearRegression
Correlation = Correlation
# Pattern indicators (3)
PivotPoints = PivotPoints
FibonacciRetracement = FibonacciRetracement
ZigZag = ZigZag
# Market Structure indicators (3)
SwingIndex = SwingIndex
AccumulativeSwingIndex = AccumulativeSwingIndex
FractalIndicator = FractalIndicator
# Advanced Mathematical indicators (1)
HilbertTransform = HilbertTransform
# Price indicators (4)
TypicalPrice = TypicalPrice
WeightedClose = WeightedClose
MedianPrice = MedianPrice
BalanceOfPower = BalanceOfPower
# Complex indicators (1)
IchimokuKinkoHyo = IchimokuKinkoHyo
[docs]
class BaseStrategy:
"""Base strategy class"""
[docs]
def __init__(self):
self.data: Dict[str, pd.DataFrame] = {}
self.current_index = 0
self.portfolio: Optional[Portfolio] = None
self.indicators: Dict[str, Dict[str, BaseIndicator]] = {}
[docs]
def init_indicators(self):
"""Initialize technical indicators - to be implemented by subclasses"""
pass
[docs]
def next(self):
"""Strategy logic for each bar - to be implemented by subclasses"""
raise NotImplementedError
[docs]
def buy(self, symbol: str, size: float = 1.0, price: Optional[float] = None):
"""Place a buy order"""
if self.portfolio is None:
return
current_price = price or self.data[symbol].iloc[self.current_index]['Close']
current_time = self.data[symbol].index[self.current_index]
# Calculate quantity based on size (as percentage of portfolio)
available_cash = self.portfolio.cash
order_value = available_cash * size
quantity = order_value / current_price
if available_cash >= order_value:
trade = Trade(symbol, quantity, current_price, current_time, action='BUY')
self.portfolio.add_trade(trade)
[docs]
def sell(self, symbol: str, size: Optional[float] = None, price: Optional[float] = None):
"""Place a sell order"""
if self.portfolio is None or symbol not in self.portfolio.positions:
return
current_price = price or self.data[symbol].iloc[self.current_index]['Close']
current_time = self.data[symbol].index[self.current_index]
position = self.portfolio.positions[symbol]
quantity = size or position.quantity
trade = Trade(symbol, quantity, current_price, current_time, action='SELL')
self.portfolio.add_trade(trade)
[docs]
def position(self, symbol: str) -> Optional[Position]:
"""Get current position for symbol"""
if self.portfolio is None:
return None
return self.portfolio.positions.get(symbol)
[docs]
class BacktestResults:
"""Backtest results container"""
[docs]
def __init__(self, portfolio: Portfolio, metrics: PerformanceMetrics):
self.portfolio = portfolio
self.metrics = metrics
[docs]
def summary(self) -> str:
"""Generate summary report"""
return f"""
BACKTEST RESULTS SUMMARY
=====================================
Initial Capital: ${self.portfolio.initial_cash:,.2f}
Final Portfolio Value: ${self.portfolio.total_equity:,.2f}
Total Return: {self.metrics.total_return:.2f}%
Annualized Return: {self.metrics.annualized_return:.2f}%
Max Drawdown: {self.metrics.max_drawdown:.2f}%
Sharpe Ratio: {self.metrics.sharpe_ratio:.2f}
Sortino Ratio: {self.metrics.sortino_ratio:.2f}
Calmar Ratio: {self.metrics.calmar_ratio:.2f}
Volatility: {self.metrics.volatility:.2f}%
Win Rate: {self.metrics.win_rate:.2f}%
Profit Factor: {self.metrics.profit_factor:.2f}
Total Trades: {len(self.metrics.trades)}
"""
[docs]
class Backtest:
"""Main backtesting engine"""
[docs]
def __init__(self, strategy: BaseStrategy, initial_cash: float = 100000.0):
self.strategy = strategy
self.initial_cash = initial_cash
self.data_feed: Optional[DataFeed] = None
[docs]
def add_data_source(self, data_feed: DataFeed):
"""Add data source"""
self.data_feed = data_feed
[docs]
def run(self, start_date: str = '2020-01-01', end_date: str = None) -> BacktestResults:
"""Run the backtest"""
if self.data_feed is None:
raise ValueError("No data feed configured")
if end_date is None:
end_date = datetime.now().strftime('%Y-%m-%d')
# Load data
self.data_feed.load_data(start_date, end_date)
self.strategy.data = self.data_feed.data
# Initialize portfolio
portfolio = Portfolio(self.initial_cash)
self.strategy.portfolio = portfolio
# Initialize strategy indicators
self.strategy.init_indicators()
# Get the common date range across all symbols
if not self.strategy.data:
raise ValueError("No data loaded")
# Find common date range
date_ranges = []
for symbol, data in self.strategy.data.items():
if not data.empty:
date_ranges.append((data.index[0], data.index[-1]))
if not date_ranges:
raise ValueError("No valid data found")
start_common = max(start for start, _ in date_ranges)
end_common = min(end for _, end in date_ranges)
# Align all data to common date range
aligned_data = {}
for symbol, data in self.strategy.data.items():
aligned_data[symbol] = data.loc[start_common:end_common]
self.strategy.data = aligned_data
if not aligned_data:
raise ValueError("No overlapping data found")
# Get the primary symbol for iteration
primary_symbol = list(aligned_data.keys())[0]
primary_data = aligned_data[primary_symbol]
# Run backtest
for i in range(len(primary_data)):
self.strategy.current_index = i
current_timestamp = primary_data.index[i]
# Update indicator values
current_prices = {}
for symbol, data in aligned_data.items():
if i < len(data):
current_price = data.iloc[i]['Close']
current_prices[symbol] = current_price
# Update indicators for this symbol
if symbol in self.strategy.indicators:
for indicator in self.strategy.indicators[symbol].values():
indicator.update(current_price)
# Update portfolio prices
portfolio.update_prices(current_prices, current_timestamp)
# Execute strategy logic
try:
self.strategy.next()
except Exception as e:
print(f"Strategy error at {current_timestamp}: {e}")
# Calculate metrics
metrics = PerformanceMetrics(
portfolio.equity_curve,
portfolio.timestamps,
portfolio.trades,
self.initial_cash
)
return BacktestResults(portfolio, metrics)
[docs]
class TechnicalIndicators:
"""Convenience class for technical indicator calculations"""
[docs]
@staticmethod
def sma(data: pd.Series, period: int) -> pd.Series:
"""Simple Moving Average"""
return data.rolling(window=period).mean()
[docs]
@staticmethod
def ema(data: pd.Series, period: int) -> pd.Series:
"""Exponential Moving Average"""
return data.ewm(span=period).mean()
[docs]
@staticmethod
def rsi(data: pd.Series, period: int = 14) -> pd.Series:
"""Relative Strength Index"""
delta = data.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
[docs]
@staticmethod
def bollinger_bands(data: pd.Series, period: int = 20, std_dev: float = 2):
"""Bollinger Bands - returns (upper, middle, lower)"""
sma = data.rolling(window=period).mean()
std = data.rolling(window=period).std()
upper = sma + (std * std_dev)
lower = sma - (std * std_dev)
return upper, sma, lower
[docs]
@staticmethod
def macd(data: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9):
"""MACD - returns (macd_line, signal_line, histogram)"""
ema_fast = data.ewm(span=fast).mean()
ema_slow = data.ewm(span=slow).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal).mean()
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
# Export main classes and functions
__all__ = [
'BaseStrategy', 'Backtest', 'YFinanceDataFeed', 'TechnicalIndicators', 'indicators',
'Position', 'Trade', 'Portfolio', 'PerformanceMetrics', 'BacktestResults'
]