Implement market data handling and validation components

This commit is contained in:
Billy Valentine
2025-09-09 18:36:30 -04:00
parent 63200fe9b4
commit 23bb431d42
3 changed files with 851 additions and 0 deletions

View File

@@ -0,0 +1,275 @@
using System;
using System.Collections.Generic;
using NT8.Core.Common.Models;
namespace NT8.Core.MarketData
{
/// <summary>
/// Validates market data quality and detects anomalies
/// </summary>
public class DataQualityValidator
{
/// <summary>
/// Configuration for data quality validation
/// </summary>
public class DataQualityConfig
{
/// <summary>
/// Maximum allowed price change percentage between consecutive bars
/// </summary>
public double MaxPriceChangePercent { get; set; }
/// <summary>
/// Minimum allowed volume for a bar to be considered valid
/// </summary>
public long MinVolume { get; set; }
/// <summary>
/// Maximum allowed volume spike multiplier compared to average
/// </summary>
public double MaxVolumeSpikeMultiplier { get; set; }
/// <summary>
/// Constructor for DataQualityConfig
/// </summary>
public DataQualityConfig()
{
MaxPriceChangePercent = 10.0; // 10% default
MinVolume = 1; // Minimum 1 volume
MaxVolumeSpikeMultiplier = 100.0; // 100x volume spike threshold
}
}
private readonly DataQualityConfig _config;
private readonly Dictionary<string, List<double>> _priceHistory;
private readonly Dictionary<string, List<long>> _volumeHistory;
/// <summary>
/// Constructor for DataQualityValidator
/// </summary>
public DataQualityValidator(DataQualityConfig config = null)
{
_config = config ?? new DataQualityConfig();
_priceHistory = new Dictionary<string, List<double>>();
_volumeHistory = new Dictionary<string, List<long>>();
}
/// <summary>
/// Validate bar data quality and detect anomalies
/// </summary>
public DataQualityResult ValidateBar(BarData currentBar, BarData previousBar = null)
{
if (currentBar == null)
throw new ArgumentNullException("currentBar");
var result = new DataQualityResult();
// Basic validation
if (!BasicBarValidation(currentBar))
{
result.IsValid = false;
result.Reasons.Add("Basic validation failed");
return result;
}
// Previous bar validation
if (previousBar != null)
{
// Price change validation
var priceChangePercent = CalculatePriceChangePercent(previousBar.Close, currentBar.Open);
if (Math.Abs(priceChangePercent) > _config.MaxPriceChangePercent)
{
result.IsValid = false;
result.Reasons.Add(string.Format("Price change {0:F2}% exceeds threshold of {1:F2}%",
priceChangePercent, _config.MaxPriceChangePercent));
}
// Volume validation
if (currentBar.Volume < _config.MinVolume)
{
result.IsValid = false;
result.Reasons.Add(string.Format("Volume {0} below minimum threshold of {1}",
currentBar.Volume, _config.MinVolume));
}
// Volume spike detection
var avgVolume = CalculateAverageVolume(previousBar.Symbol);
if (avgVolume > 0 && currentBar.Volume > avgVolume * _config.MaxVolumeSpikeMultiplier)
{
result.IsValid = false;
result.Reasons.Add(string.Format("Volume spike detected: {0} vs average {1:F0}",
currentBar.Volume, avgVolume));
}
}
// Update history
UpdateHistory(currentBar);
result.IsValid = result.Reasons.Count == 0;
return result;
}
/// <summary>
/// Validate tick data quality and detect anomalies
/// </summary>
public DataQualityResult ValidateTick(TickData currentTick, TickData previousTick = null)
{
if (currentTick == null)
throw new ArgumentNullException("currentTick");
var result = new DataQualityResult();
// Basic validation
if (!BasicTickValidation(currentTick))
{
result.IsValid = false;
result.Reasons.Add("Basic validation failed");
return result;
}
// Previous tick validation
if (previousTick != null)
{
// Price change validation
var priceChangePercent = CalculatePriceChangePercent(previousTick.Price, currentTick.Price);
if (Math.Abs(priceChangePercent) > _config.MaxPriceChangePercent)
{
result.IsValid = false;
result.Reasons.Add(string.Format("Price change {0:F2}% exceeds threshold of {1:F2}%",
priceChangePercent, _config.MaxPriceChangePercent));
}
}
result.IsValid = result.Reasons.Count == 0;
return result;
}
/// <summary>
/// Basic bar validation
/// </summary>
private bool BasicBarValidation(BarData bar)
{
// Check for reasonable price values
if (bar.Open <= 0 || bar.High <= 0 || bar.Low <= 0 || bar.Close <= 0)
return false;
// Check for valid high/low relationships
if (bar.High < bar.Low)
return false;
// Check if close price is within high/low range
if (bar.Close < bar.Low || bar.Close > bar.High)
return false;
// Check for reasonable volume
if (bar.Volume < 0)
return false;
return true;
}
/// <summary>
/// Basic tick validation
/// </summary>
private bool BasicTickValidation(TickData tick)
{
// Check for reasonable price values
if (tick.Price <= 0)
return false;
// Check for reasonable size
if (tick.Size < 0)
return false;
return true;
}
/// <summary>
/// Calculate price change percentage
/// </summary>
private double CalculatePriceChangePercent(double previousPrice, double currentPrice)
{
if (previousPrice == 0)
return 0;
return ((currentPrice - previousPrice) / previousPrice) * 100;
}
/// <summary>
/// Calculate average volume for a symbol
/// </summary>
private double CalculateAverageVolume(string symbol)
{
List<long> volumes;
if (!_volumeHistory.TryGetValue(symbol, out volumes) || volumes.Count == 0)
return 0;
long sum = 0;
foreach (var volume in volumes)
{
sum += volume;
}
return (double)sum / volumes.Count;
}
/// <summary>
/// Update price and volume history
/// </summary>
private void UpdateHistory(BarData bar)
{
// Update price history
List<double> prices;
if (!_priceHistory.TryGetValue(bar.Symbol, out prices))
{
prices = new List<double>();
_priceHistory[bar.Symbol] = prices;
}
prices.Add(bar.Close);
if (prices.Count > 100) // Keep only last 100 prices
{
prices.RemoveAt(0);
}
// Update volume history
List<long> volumes;
if (!_volumeHistory.TryGetValue(bar.Symbol, out volumes))
{
volumes = new List<long>();
_volumeHistory[bar.Symbol] = volumes;
}
volumes.Add(bar.Volume);
if (volumes.Count > 100) // Keep only last 100 volumes
{
volumes.RemoveAt(0);
}
}
}
/// <summary>
/// Result of data quality validation
/// </summary>
public class DataQualityResult
{
/// <summary>
/// Whether the data is valid
/// </summary>
public bool IsValid { get; set; }
/// <summary>
/// Reasons for validation failure
/// </summary>
public List<string> Reasons { get; set; }
/// <summary>
/// Constructor for DataQualityResult
/// </summary>
public DataQualityResult()
{
IsValid = true;
Reasons = new List<string>();
}
}
}

View File

@@ -0,0 +1,262 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Text;
using System.Threading.Tasks;
using NT8.Core.Common.Models;
namespace NT8.Core.MarketData
{
/// <summary>
/// Manages historical market data storage, retrieval, and archiving
/// </summary>
public class HistoricalDataManager
{
private readonly string _dataDirectory;
private readonly bool _enableCompression;
/// <summary>
/// Constructor for HistoricalDataManager
/// </summary>
public HistoricalDataManager(string dataDirectory = null, bool enableCompression = true)
{
_dataDirectory = dataDirectory ?? Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "NT8", "MarketData");
_enableCompression = enableCompression;
// Ensure data directory exists
if (!Directory.Exists(_dataDirectory))
{
Directory.CreateDirectory(_dataDirectory);
}
}
/// <summary>
/// Save historical bars to storage
/// </summary>
public async Task SaveHistoricalBars(string symbol, TimeSpan barSize, List<BarData> bars)
{
if (string.IsNullOrEmpty(symbol))
throw new ArgumentException("Symbol cannot be null or empty", "symbol");
if (bars == null)
throw new ArgumentNullException("bars");
var fileName = GenerateFileName(symbol, barSize, DateTime.UtcNow);
var filePath = Path.Combine(_dataDirectory, fileName);
// Convert bars to CSV format
var csv = ConvertBarsToCsv(bars);
if (_enableCompression)
{
await SaveCompressedFile(filePath + ".gz", csv);
}
else
{
await SaveTextFile(filePath + ".csv", csv);
}
}
/// <summary>
/// Load historical bars from storage
/// </summary>
public async Task<List<BarData>> LoadHistoricalBars(string symbol, TimeSpan barSize, DateTime date)
{
if (string.IsNullOrEmpty(symbol))
throw new ArgumentException("Symbol cannot be null or empty", "symbol");
var fileName = GenerateFileName(symbol, barSize, date);
var filePath = Path.Combine(_dataDirectory, fileName);
string csv;
if (_enableCompression)
{
filePath += ".gz";
if (!File.Exists(filePath))
return new List<BarData>();
csv = await LoadCompressedFile(filePath);
}
else
{
filePath += ".csv";
if (!File.Exists(filePath))
return new List<BarData>();
csv = await LoadTextFile(filePath);
}
// Convert CSV back to BarData
return ConvertCsvToBars(csv);
}
/// <summary>
/// Archive old data files
/// </summary>
public async Task ArchiveOldData(DateTime olderThan)
{
var archiveDirectory = Path.Combine(_dataDirectory, "Archive");
if (!Directory.Exists(archiveDirectory))
{
Directory.CreateDirectory(archiveDirectory);
}
var files = Directory.GetFiles(_dataDirectory, "*.csv*");
foreach (var file in files)
{
var fileName = Path.GetFileName(file);
var fileDate = ExtractDateFromFileName(fileName);
if (fileDate < olderThan)
{
var archivePath = Path.Combine(archiveDirectory, fileName);
await Task.Run(() => File.Move(file, archivePath));
}
}
}
/// <summary>
/// Generate file name for data storage
/// </summary>
private string GenerateFileName(string symbol, TimeSpan barSize, DateTime date)
{
return string.Format("{0}_{1}_{2:yyyyMMdd}", symbol, (int)barSize.TotalMinutes, date);
}
/// <summary>
/// Extract date from file name
/// </summary>
private DateTime ExtractDateFromFileName(string fileName)
{
// Extract date portion from file name
var parts = fileName.Split('_');
if (parts.Length >= 3)
{
var datePart = parts[2].Split('.')[0]; // Remove extension
DateTime date;
if (DateTime.TryParseExact(datePart, "yyyyMMdd", null, System.Globalization.DateTimeStyles.None, out date))
{
return date;
}
}
return DateTime.MinValue;
}
/// <summary>
/// Convert bars to CSV format
/// </summary>
private string ConvertBarsToCsv(List<BarData> bars)
{
var sb = new StringBuilder();
// Header
sb.AppendLine("Symbol,Time,Open,High,Low,Close,Volume,BarSizeTicks");
// Data rows
foreach (var bar in bars)
{
sb.AppendFormat("{0},{1:yyyy-MM-dd HH:mm:ss},{2},{3},{4},{5},{6},{7}",
bar.Symbol,
bar.Time,
bar.Open,
bar.High,
bar.Low,
bar.Close,
bar.Volume,
bar.BarSize.Ticks);
sb.AppendLine();
}
return sb.ToString();
}
/// <summary>
/// Convert CSV to bars
/// </summary>
private List<BarData> ConvertCsvToBars(string csv)
{
var bars = new List<BarData>();
var lines = csv.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries);
// Skip header line
for (int i = 1; i < lines.Length; i++)
{
var fields = lines[i].Split(',');
if (fields.Length >= 8)
{
try
{
var bar = new BarData(
fields[0], // Symbol
DateTime.Parse(fields[1]), // Time
double.Parse(fields[2]), // Open
double.Parse(fields[3]), // High
double.Parse(fields[4]), // Low
double.Parse(fields[5]), // Close
long.Parse(fields[6]), // Volume
TimeSpan.FromTicks(long.Parse(fields[7])) // BarSize
);
bars.Add(bar);
}
catch
{
// Skip invalid rows
}
}
}
return bars;
}
/// <summary>
/// Save compressed file
/// </summary>
private async Task SaveCompressedFile(string filePath, string content)
{
using (var fileStream = new FileStream(filePath, FileMode.Create))
using (var gzipStream = new GZipStream(fileStream, CompressionMode.Compress))
using (var writer = new StreamWriter(gzipStream))
{
await writer.WriteAsync(content);
}
}
/// <summary>
/// Load compressed file
/// </summary>
private async Task<string> LoadCompressedFile(string filePath)
{
using (var fileStream = new FileStream(filePath, FileMode.Open))
using (var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress))
using (var reader = new StreamReader(gzipStream))
{
return await reader.ReadToEndAsync();
}
}
/// <summary>
/// Save text file
/// </summary>
private async Task SaveTextFile(string filePath, string content)
{
using (var writer = new StreamWriter(filePath))
{
await writer.WriteAsync(content);
}
}
/// <summary>
/// Load text file
/// </summary>
private async Task<string> LoadTextFile(string filePath)
{
using (var reader = new StreamReader(filePath))
{
return await reader.ReadToEndAsync();
}
}
}
}

View File

@@ -0,0 +1,314 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NT8.Core.Common.Models;
namespace NT8.Core.MarketData
{
/// <summary>
/// Base implementation of market data provider with caching and validation
/// </summary>
public class MarketDataProvider : IMarketDataProvider
{
// Data storage
private readonly ConcurrentDictionary<string, List<BarData>> _barCache;
private readonly ConcurrentDictionary<string, List<TickData>> _tickCache;
private readonly ConcurrentDictionary<string, double> _currentPrices;
// Subscriptions
private readonly ConcurrentDictionary<string, List<Action<BarData>>> _barSubscriptions;
private readonly ConcurrentDictionary<string, List<Action<TickData>>> _tickSubscriptions;
// Configuration
private readonly int _maxCacheSize;
private readonly TimeSpan _dataFreshnessTimeout;
/// <summary>
/// Constructor for MarketDataProvider
/// </summary>
public MarketDataProvider(int maxCacheSize = 10000, TimeSpan? dataFreshnessTimeout = null)
{
_barCache = new ConcurrentDictionary<string, List<BarData>>();
_tickCache = new ConcurrentDictionary<string, List<TickData>>();
_currentPrices = new ConcurrentDictionary<string, double>();
_barSubscriptions = new ConcurrentDictionary<string, List<Action<BarData>>>();
_tickSubscriptions = new ConcurrentDictionary<string, List<Action<TickData>>>();
_maxCacheSize = maxCacheSize;
_dataFreshnessTimeout = dataFreshnessTimeout ?? TimeSpan.FromMinutes(5);
}
/// <summary>
/// Subscribe to bar data
/// </summary>
public void SubscribeBars(string symbol, TimeSpan barSize, Action<BarData> onBar)
{
if (string.IsNullOrEmpty(symbol))
throw new ArgumentException("Symbol cannot be null or empty", "symbol");
if (onBar == null)
throw new ArgumentNullException("onBar");
string key = string.Format("{0}_{1}", symbol, barSize);
_barSubscriptions.AddOrUpdate(
key,
new List<Action<BarData>> { onBar },
(k, list) => { list.Add(onBar); return list; }
);
}
/// <summary>
/// Subscribe to tick data
/// </summary>
public void SubscribeTicks(string symbol, Action<TickData> onTick)
{
if (string.IsNullOrEmpty(symbol))
throw new ArgumentException("Symbol cannot be null or empty", "symbol");
if (onTick == null)
throw new ArgumentNullException("onTick");
_tickSubscriptions.AddOrUpdate(
symbol,
new List<Action<TickData>> { onTick },
(k, list) => { list.Add(onTick); return list; }
);
}
/// <summary>
/// Get historical bars
/// </summary>
public async Task<List<BarData>> GetHistoricalBars(string symbol, TimeSpan barSize, int count)
{
if (string.IsNullOrEmpty(symbol))
throw new ArgumentException("Symbol cannot be null or empty", "symbol");
if (count <= 0)
throw new ArgumentException("Count must be greater than zero", "count");
string key = string.Format("{0}_{1}", symbol, barSize);
List<BarData> bars;
if (_barCache.TryGetValue(key, out bars))
{
// Return the most recent bars, up to the requested count
var result = bars
.OrderByDescending(b => b.Time)
.Take(count)
.ToList();
return await Task.FromResult(result);
}
// Return empty list if no data is available
return await Task.FromResult(new List<BarData>());
}
/// <summary>
/// Get current market price
/// </summary>
public double? GetCurrentPrice(string symbol)
{
if (string.IsNullOrEmpty(symbol))
throw new ArgumentException("Symbol cannot be null or empty", "symbol");
double price;
if (_currentPrices.TryGetValue(symbol, out price))
{
return price;
}
return null;
}
/// <summary>
/// Add bar data to the provider
/// </summary>
public void AddBarData(BarData bar)
{
if (bar == null)
throw new ArgumentNullException("bar");
string key = string.Format("{0}_{1}", bar.Symbol, bar.BarSize);
// Add to cache
_barCache.AddOrUpdate(
key,
new List<BarData> { bar },
(k, list) =>
{
list.Add(bar);
// Trim cache if it exceeds maximum size
if (list.Count > _maxCacheSize)
{
list.RemoveRange(0, list.Count - _maxCacheSize);
}
return list;
}
);
// Update current price
_currentPrices[bar.Symbol] = bar.Close;
// Notify subscribers
List<Action<BarData>> subscribers;
if (_barSubscriptions.TryGetValue(key, out subscribers))
{
foreach (var subscriber in subscribers)
{
try
{
subscriber(bar);
}
catch
{
// Ignore exceptions in subscriber callbacks
}
}
}
}
/// <summary>
/// Add tick data to the provider
/// </summary>
public void AddTickData(TickData tick)
{
if (tick == null)
throw new ArgumentNullException("tick");
// Add to cache
_tickCache.AddOrUpdate(
tick.Symbol,
new List<TickData> { tick },
(k, list) =>
{
list.Add(tick);
// Trim cache if it exceeds maximum size
if (list.Count > _maxCacheSize)
{
list.RemoveRange(0, list.Count - _maxCacheSize);
}
return list;
}
);
// Update current price
_currentPrices[tick.Symbol] = tick.Price;
// Notify subscribers
List<Action<TickData>> subscribers;
if (_tickSubscriptions.TryGetValue(tick.Symbol, out subscribers))
{
foreach (var subscriber in subscribers)
{
try
{
subscriber(tick);
}
catch
{
// Ignore exceptions in subscriber callbacks
}
}
}
}
/// <summary>
/// Validate market data quality
/// </summary>
public bool ValidateDataQuality(BarData bar)
{
if (bar == null)
return false;
// Check for reasonable price values
if (bar.Open <= 0 || bar.High <= 0 || bar.Low <= 0 || bar.Close <= 0)
return false;
// Check for valid high/low relationships
if (bar.High < bar.Low)
return false;
// Check if close price is within high/low range
if (bar.Close < bar.Low || bar.Close > bar.High)
return false;
// Check for reasonable volume
if (bar.Volume < 0)
return false;
return true;
}
/// <summary>
/// Validate market data quality
/// </summary>
public bool ValidateDataQuality(TickData tick)
{
if (tick == null)
return false;
// Check for reasonable price values
if (tick.Price <= 0)
return false;
// Check for reasonable size
if (tick.Size < 0)
return false;
return true;
}
/// <summary>
/// Get data freshness information
/// </summary>
public DateTime? GetLastUpdateTime(string symbol)
{
if (string.IsNullOrEmpty(symbol))
return null;
// Check bars for this symbol
var barKeys = _barCache.Keys.Where(k => k.StartsWith(string.Format("{0}_", symbol))).ToList();
DateTime? latestBarTime = null;
foreach (var key in barKeys)
{
List<BarData> bars;
if (_barCache.TryGetValue(key, out bars) && bars.Count > 0)
{
var lastBarTime = bars.Max(b => b.Time);
if (latestBarTime == null || lastBarTime > latestBarTime)
{
latestBarTime = lastBarTime;
}
}
}
// Check ticks for this symbol
List<TickData> ticks;
if (_tickCache.TryGetValue(symbol, out ticks) && ticks.Count > 0)
{
var lastTickTime = ticks.Max(t => t.Time);
if (latestBarTime == null || lastTickTime > latestBarTime)
{
latestBarTime = lastTickTime;
}
}
return latestBarTime;
}
/// <summary>
/// Check if data is fresh
/// </summary>
public bool IsDataFresh(string symbol)
{
DateTime? lastUpdate = GetLastUpdateTime(symbol);
if (lastUpdate == null)
return false;
return DateTime.UtcNow - lastUpdate.Value < _dataFreshnessTimeout;
}
}
}