diff --git a/src/NT8.Core/MarketData/DataQualityValidator.cs b/src/NT8.Core/MarketData/DataQualityValidator.cs new file mode 100644 index 0000000..6e9f446 --- /dev/null +++ b/src/NT8.Core/MarketData/DataQualityValidator.cs @@ -0,0 +1,275 @@ +using System; +using System.Collections.Generic; +using NT8.Core.Common.Models; + +namespace NT8.Core.MarketData +{ + /// + /// Validates market data quality and detects anomalies + /// + public class DataQualityValidator + { + /// + /// Configuration for data quality validation + /// + public class DataQualityConfig + { + /// + /// Maximum allowed price change percentage between consecutive bars + /// + public double MaxPriceChangePercent { get; set; } + + /// + /// Minimum allowed volume for a bar to be considered valid + /// + public long MinVolume { get; set; } + + /// + /// Maximum allowed volume spike multiplier compared to average + /// + public double MaxVolumeSpikeMultiplier { get; set; } + + /// + /// Constructor for DataQualityConfig + /// + public DataQualityConfig() + { + MaxPriceChangePercent = 10.0; // 10% default + MinVolume = 1; // Minimum 1 volume + MaxVolumeSpikeMultiplier = 100.0; // 100x volume spike threshold + } + } + + private readonly DataQualityConfig _config; + private readonly Dictionary> _priceHistory; + private readonly Dictionary> _volumeHistory; + + /// + /// Constructor for DataQualityValidator + /// + public DataQualityValidator(DataQualityConfig config = null) + { + _config = config ?? new DataQualityConfig(); + _priceHistory = new Dictionary>(); + _volumeHistory = new Dictionary>(); + } + + /// + /// Validate bar data quality and detect anomalies + /// + public DataQualityResult ValidateBar(BarData currentBar, BarData previousBar = null) + { + if (currentBar == null) + throw new ArgumentNullException("currentBar"); + + var result = new DataQualityResult(); + + // Basic validation + if (!BasicBarValidation(currentBar)) + { + result.IsValid = false; + result.Reasons.Add("Basic validation failed"); + return result; + } + + // Previous bar validation + if (previousBar != null) + { + // Price change validation + var priceChangePercent = CalculatePriceChangePercent(previousBar.Close, currentBar.Open); + if (Math.Abs(priceChangePercent) > _config.MaxPriceChangePercent) + { + result.IsValid = false; + result.Reasons.Add(string.Format("Price change {0:F2}% exceeds threshold of {1:F2}%", + priceChangePercent, _config.MaxPriceChangePercent)); + } + + // Volume validation + if (currentBar.Volume < _config.MinVolume) + { + result.IsValid = false; + result.Reasons.Add(string.Format("Volume {0} below minimum threshold of {1}", + currentBar.Volume, _config.MinVolume)); + } + + // Volume spike detection + var avgVolume = CalculateAverageVolume(previousBar.Symbol); + if (avgVolume > 0 && currentBar.Volume > avgVolume * _config.MaxVolumeSpikeMultiplier) + { + result.IsValid = false; + result.Reasons.Add(string.Format("Volume spike detected: {0} vs average {1:F0}", + currentBar.Volume, avgVolume)); + } + } + + // Update history + UpdateHistory(currentBar); + + result.IsValid = result.Reasons.Count == 0; + return result; + } + + /// + /// Validate tick data quality and detect anomalies + /// + public DataQualityResult ValidateTick(TickData currentTick, TickData previousTick = null) + { + if (currentTick == null) + throw new ArgumentNullException("currentTick"); + + var result = new DataQualityResult(); + + // Basic validation + if (!BasicTickValidation(currentTick)) + { + result.IsValid = false; + result.Reasons.Add("Basic validation failed"); + return result; + } + + // Previous tick validation + if (previousTick != null) + { + // Price change validation + var priceChangePercent = CalculatePriceChangePercent(previousTick.Price, currentTick.Price); + if (Math.Abs(priceChangePercent) > _config.MaxPriceChangePercent) + { + result.IsValid = false; + result.Reasons.Add(string.Format("Price change {0:F2}% exceeds threshold of {1:F2}%", + priceChangePercent, _config.MaxPriceChangePercent)); + } + } + + result.IsValid = result.Reasons.Count == 0; + return result; + } + + /// + /// Basic bar validation + /// + private bool BasicBarValidation(BarData bar) + { + // Check for reasonable price values + if (bar.Open <= 0 || bar.High <= 0 || bar.Low <= 0 || bar.Close <= 0) + return false; + + // Check for valid high/low relationships + if (bar.High < bar.Low) + return false; + + // Check if close price is within high/low range + if (bar.Close < bar.Low || bar.Close > bar.High) + return false; + + // Check for reasonable volume + if (bar.Volume < 0) + return false; + + return true; + } + + /// + /// Basic tick validation + /// + private bool BasicTickValidation(TickData tick) + { + // Check for reasonable price values + if (tick.Price <= 0) + return false; + + // Check for reasonable size + if (tick.Size < 0) + return false; + + return true; + } + + /// + /// Calculate price change percentage + /// + private double CalculatePriceChangePercent(double previousPrice, double currentPrice) + { + if (previousPrice == 0) + return 0; + + return ((currentPrice - previousPrice) / previousPrice) * 100; + } + + /// + /// Calculate average volume for a symbol + /// + private double CalculateAverageVolume(string symbol) + { + List volumes; + if (!_volumeHistory.TryGetValue(symbol, out volumes) || volumes.Count == 0) + return 0; + + long sum = 0; + foreach (var volume in volumes) + { + sum += volume; + } + + return (double)sum / volumes.Count; + } + + /// + /// Update price and volume history + /// + private void UpdateHistory(BarData bar) + { + // Update price history + List prices; + if (!_priceHistory.TryGetValue(bar.Symbol, out prices)) + { + prices = new List(); + _priceHistory[bar.Symbol] = prices; + } + + prices.Add(bar.Close); + if (prices.Count > 100) // Keep only last 100 prices + { + prices.RemoveAt(0); + } + + // Update volume history + List volumes; + if (!_volumeHistory.TryGetValue(bar.Symbol, out volumes)) + { + volumes = new List(); + _volumeHistory[bar.Symbol] = volumes; + } + + volumes.Add(bar.Volume); + if (volumes.Count > 100) // Keep only last 100 volumes + { + volumes.RemoveAt(0); + } + } + } + + /// + /// Result of data quality validation + /// + public class DataQualityResult + { + /// + /// Whether the data is valid + /// + public bool IsValid { get; set; } + + /// + /// Reasons for validation failure + /// + public List Reasons { get; set; } + + /// + /// Constructor for DataQualityResult + /// + public DataQualityResult() + { + IsValid = true; + Reasons = new List(); + } + } +} diff --git a/src/NT8.Core/MarketData/HistoricalDataManager.cs b/src/NT8.Core/MarketData/HistoricalDataManager.cs new file mode 100644 index 0000000..03d7402 --- /dev/null +++ b/src/NT8.Core/MarketData/HistoricalDataManager.cs @@ -0,0 +1,262 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Text; +using System.Threading.Tasks; +using NT8.Core.Common.Models; + +namespace NT8.Core.MarketData +{ + /// + /// Manages historical market data storage, retrieval, and archiving + /// + public class HistoricalDataManager + { + private readonly string _dataDirectory; + private readonly bool _enableCompression; + + /// + /// Constructor for HistoricalDataManager + /// + public HistoricalDataManager(string dataDirectory = null, bool enableCompression = true) + { + _dataDirectory = dataDirectory ?? Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "NT8", "MarketData"); + _enableCompression = enableCompression; + + // Ensure data directory exists + if (!Directory.Exists(_dataDirectory)) + { + Directory.CreateDirectory(_dataDirectory); + } + } + + /// + /// Save historical bars to storage + /// + public async Task SaveHistoricalBars(string symbol, TimeSpan barSize, List bars) + { + if (string.IsNullOrEmpty(symbol)) + throw new ArgumentException("Symbol cannot be null or empty", "symbol"); + + if (bars == null) + throw new ArgumentNullException("bars"); + + var fileName = GenerateFileName(symbol, barSize, DateTime.UtcNow); + var filePath = Path.Combine(_dataDirectory, fileName); + + // Convert bars to CSV format + var csv = ConvertBarsToCsv(bars); + + if (_enableCompression) + { + await SaveCompressedFile(filePath + ".gz", csv); + } + else + { + await SaveTextFile(filePath + ".csv", csv); + } + } + + /// + /// Load historical bars from storage + /// + public async Task> LoadHistoricalBars(string symbol, TimeSpan barSize, DateTime date) + { + if (string.IsNullOrEmpty(symbol)) + throw new ArgumentException("Symbol cannot be null or empty", "symbol"); + + var fileName = GenerateFileName(symbol, barSize, date); + var filePath = Path.Combine(_dataDirectory, fileName); + + string csv; + if (_enableCompression) + { + filePath += ".gz"; + if (!File.Exists(filePath)) + return new List(); + + csv = await LoadCompressedFile(filePath); + } + else + { + filePath += ".csv"; + if (!File.Exists(filePath)) + return new List(); + + csv = await LoadTextFile(filePath); + } + + // Convert CSV back to BarData + return ConvertCsvToBars(csv); + } + + /// + /// Archive old data files + /// + public async Task ArchiveOldData(DateTime olderThan) + { + var archiveDirectory = Path.Combine(_dataDirectory, "Archive"); + if (!Directory.Exists(archiveDirectory)) + { + Directory.CreateDirectory(archiveDirectory); + } + + var files = Directory.GetFiles(_dataDirectory, "*.csv*"); + foreach (var file in files) + { + var fileName = Path.GetFileName(file); + var fileDate = ExtractDateFromFileName(fileName); + + if (fileDate < olderThan) + { + var archivePath = Path.Combine(archiveDirectory, fileName); + await Task.Run(() => File.Move(file, archivePath)); + } + } + } + + /// + /// Generate file name for data storage + /// + private string GenerateFileName(string symbol, TimeSpan barSize, DateTime date) + { + return string.Format("{0}_{1}_{2:yyyyMMdd}", symbol, (int)barSize.TotalMinutes, date); + } + + /// + /// Extract date from file name + /// + private DateTime ExtractDateFromFileName(string fileName) + { + // Extract date portion from file name + var parts = fileName.Split('_'); + if (parts.Length >= 3) + { + var datePart = parts[2].Split('.')[0]; // Remove extension + DateTime date; + if (DateTime.TryParseExact(datePart, "yyyyMMdd", null, System.Globalization.DateTimeStyles.None, out date)) + { + return date; + } + } + + return DateTime.MinValue; + } + + /// + /// Convert bars to CSV format + /// + private string ConvertBarsToCsv(List bars) + { + var sb = new StringBuilder(); + + // Header + sb.AppendLine("Symbol,Time,Open,High,Low,Close,Volume,BarSizeTicks"); + + // Data rows + foreach (var bar in bars) + { + sb.AppendFormat("{0},{1:yyyy-MM-dd HH:mm:ss},{2},{3},{4},{5},{6},{7}", + bar.Symbol, + bar.Time, + bar.Open, + bar.High, + bar.Low, + bar.Close, + bar.Volume, + bar.BarSize.Ticks); + sb.AppendLine(); + } + + return sb.ToString(); + } + + /// + /// Convert CSV to bars + /// + private List ConvertCsvToBars(string csv) + { + var bars = new List(); + var lines = csv.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries); + + // Skip header line + for (int i = 1; i < lines.Length; i++) + { + var fields = lines[i].Split(','); + if (fields.Length >= 8) + { + try + { + var bar = new BarData( + fields[0], // Symbol + DateTime.Parse(fields[1]), // Time + double.Parse(fields[2]), // Open + double.Parse(fields[3]), // High + double.Parse(fields[4]), // Low + double.Parse(fields[5]), // Close + long.Parse(fields[6]), // Volume + TimeSpan.FromTicks(long.Parse(fields[7])) // BarSize + ); + + bars.Add(bar); + } + catch + { + // Skip invalid rows + } + } + } + + return bars; + } + + /// + /// Save compressed file + /// + private async Task SaveCompressedFile(string filePath, string content) + { + using (var fileStream = new FileStream(filePath, FileMode.Create)) + using (var gzipStream = new GZipStream(fileStream, CompressionMode.Compress)) + using (var writer = new StreamWriter(gzipStream)) + { + await writer.WriteAsync(content); + } + } + + /// + /// Load compressed file + /// + private async Task LoadCompressedFile(string filePath) + { + using (var fileStream = new FileStream(filePath, FileMode.Open)) + using (var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress)) + using (var reader = new StreamReader(gzipStream)) + { + return await reader.ReadToEndAsync(); + } + } + + /// + /// Save text file + /// + private async Task SaveTextFile(string filePath, string content) + { + using (var writer = new StreamWriter(filePath)) + { + await writer.WriteAsync(content); + } + } + + /// + /// Load text file + /// + private async Task LoadTextFile(string filePath) + { + using (var reader = new StreamReader(filePath)) + { + return await reader.ReadToEndAsync(); + } + } + } +} diff --git a/src/NT8.Core/MarketData/MarketDataProvider.cs b/src/NT8.Core/MarketData/MarketDataProvider.cs new file mode 100644 index 0000000..d2ac6c9 --- /dev/null +++ b/src/NT8.Core/MarketData/MarketDataProvider.cs @@ -0,0 +1,314 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using NT8.Core.Common.Models; + +namespace NT8.Core.MarketData +{ + /// + /// Base implementation of market data provider with caching and validation + /// + public class MarketDataProvider : IMarketDataProvider + { + // Data storage + private readonly ConcurrentDictionary> _barCache; + private readonly ConcurrentDictionary> _tickCache; + private readonly ConcurrentDictionary _currentPrices; + + // Subscriptions + private readonly ConcurrentDictionary>> _barSubscriptions; + private readonly ConcurrentDictionary>> _tickSubscriptions; + + // Configuration + private readonly int _maxCacheSize; + private readonly TimeSpan _dataFreshnessTimeout; + + /// + /// Constructor for MarketDataProvider + /// + public MarketDataProvider(int maxCacheSize = 10000, TimeSpan? dataFreshnessTimeout = null) + { + _barCache = new ConcurrentDictionary>(); + _tickCache = new ConcurrentDictionary>(); + _currentPrices = new ConcurrentDictionary(); + _barSubscriptions = new ConcurrentDictionary>>(); + _tickSubscriptions = new ConcurrentDictionary>>(); + + _maxCacheSize = maxCacheSize; + _dataFreshnessTimeout = dataFreshnessTimeout ?? TimeSpan.FromMinutes(5); + } + + /// + /// Subscribe to bar data + /// + public void SubscribeBars(string symbol, TimeSpan barSize, Action onBar) + { + if (string.IsNullOrEmpty(symbol)) + throw new ArgumentException("Symbol cannot be null or empty", "symbol"); + + if (onBar == null) + throw new ArgumentNullException("onBar"); + + string key = string.Format("{0}_{1}", symbol, barSize); + _barSubscriptions.AddOrUpdate( + key, + new List> { onBar }, + (k, list) => { list.Add(onBar); return list; } + ); + } + + /// + /// Subscribe to tick data + /// + public void SubscribeTicks(string symbol, Action onTick) + { + if (string.IsNullOrEmpty(symbol)) + throw new ArgumentException("Symbol cannot be null or empty", "symbol"); + + if (onTick == null) + throw new ArgumentNullException("onTick"); + + _tickSubscriptions.AddOrUpdate( + symbol, + new List> { onTick }, + (k, list) => { list.Add(onTick); return list; } + ); + } + + /// + /// Get historical bars + /// + public async Task> GetHistoricalBars(string symbol, TimeSpan barSize, int count) + { + if (string.IsNullOrEmpty(symbol)) + throw new ArgumentException("Symbol cannot be null or empty", "symbol"); + + if (count <= 0) + throw new ArgumentException("Count must be greater than zero", "count"); + + string key = string.Format("{0}_{1}", symbol, barSize); + + List bars; + if (_barCache.TryGetValue(key, out bars)) + { + // Return the most recent bars, up to the requested count + var result = bars + .OrderByDescending(b => b.Time) + .Take(count) + .ToList(); + + return await Task.FromResult(result); + } + + // Return empty list if no data is available + return await Task.FromResult(new List()); + } + + /// + /// Get current market price + /// + public double? GetCurrentPrice(string symbol) + { + if (string.IsNullOrEmpty(symbol)) + throw new ArgumentException("Symbol cannot be null or empty", "symbol"); + + double price; + if (_currentPrices.TryGetValue(symbol, out price)) + { + return price; + } + return null; + } + + /// + /// Add bar data to the provider + /// + public void AddBarData(BarData bar) + { + if (bar == null) + throw new ArgumentNullException("bar"); + + string key = string.Format("{0}_{1}", bar.Symbol, bar.BarSize); + + // Add to cache + _barCache.AddOrUpdate( + key, + new List { bar }, + (k, list) => + { + list.Add(bar); + // Trim cache if it exceeds maximum size + if (list.Count > _maxCacheSize) + { + list.RemoveRange(0, list.Count - _maxCacheSize); + } + return list; + } + ); + + // Update current price + _currentPrices[bar.Symbol] = bar.Close; + + // Notify subscribers + List> subscribers; + if (_barSubscriptions.TryGetValue(key, out subscribers)) + { + foreach (var subscriber in subscribers) + { + try + { + subscriber(bar); + } + catch + { + // Ignore exceptions in subscriber callbacks + } + } + } + } + + /// + /// Add tick data to the provider + /// + public void AddTickData(TickData tick) + { + if (tick == null) + throw new ArgumentNullException("tick"); + + // Add to cache + _tickCache.AddOrUpdate( + tick.Symbol, + new List { tick }, + (k, list) => + { + list.Add(tick); + // Trim cache if it exceeds maximum size + if (list.Count > _maxCacheSize) + { + list.RemoveRange(0, list.Count - _maxCacheSize); + } + return list; + } + ); + + // Update current price + _currentPrices[tick.Symbol] = tick.Price; + + // Notify subscribers + List> subscribers; + if (_tickSubscriptions.TryGetValue(tick.Symbol, out subscribers)) + { + foreach (var subscriber in subscribers) + { + try + { + subscriber(tick); + } + catch + { + // Ignore exceptions in subscriber callbacks + } + } + } + } + + /// + /// Validate market data quality + /// + public bool ValidateDataQuality(BarData bar) + { + if (bar == null) + return false; + + // Check for reasonable price values + if (bar.Open <= 0 || bar.High <= 0 || bar.Low <= 0 || bar.Close <= 0) + return false; + + // Check for valid high/low relationships + if (bar.High < bar.Low) + return false; + + // Check if close price is within high/low range + if (bar.Close < bar.Low || bar.Close > bar.High) + return false; + + // Check for reasonable volume + if (bar.Volume < 0) + return false; + + return true; + } + + /// + /// Validate market data quality + /// + public bool ValidateDataQuality(TickData tick) + { + if (tick == null) + return false; + + // Check for reasonable price values + if (tick.Price <= 0) + return false; + + // Check for reasonable size + if (tick.Size < 0) + return false; + + return true; + } + + /// + /// Get data freshness information + /// + public DateTime? GetLastUpdateTime(string symbol) + { + if (string.IsNullOrEmpty(symbol)) + return null; + + // Check bars for this symbol + var barKeys = _barCache.Keys.Where(k => k.StartsWith(string.Format("{0}_", symbol))).ToList(); + DateTime? latestBarTime = null; + + foreach (var key in barKeys) + { + List bars; + if (_barCache.TryGetValue(key, out bars) && bars.Count > 0) + { + var lastBarTime = bars.Max(b => b.Time); + if (latestBarTime == null || lastBarTime > latestBarTime) + { + latestBarTime = lastBarTime; + } + } + } + + // Check ticks for this symbol + List ticks; + if (_tickCache.TryGetValue(symbol, out ticks) && ticks.Count > 0) + { + var lastTickTime = ticks.Max(t => t.Time); + if (latestBarTime == null || lastTickTime > latestBarTime) + { + latestBarTime = lastTickTime; + } + } + + return latestBarTime; + } + + /// + /// Check if data is fresh + /// + public bool IsDataFresh(string symbol) + { + DateTime? lastUpdate = GetLastUpdateTime(symbol); + if (lastUpdate == null) + return false; + + return DateTime.UtcNow - lastUpdate.Value < _dataFreshnessTimeout; + } + } +}