FireHorse by Infodinamica Limitada

<PackageReference Include="FireHorse" Version="0.0.8" />

 FireHorseManager

public class FireHorseManager
using FireHorse.Dto; using FireHorse.Mappers; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; namespace FireHorse { public class FireHorseManager { private static volatile FireHorseManager _instance; private static object _syncRoot = new object(); private bool _canRun; private bool _isRunning; private readonly object LockerObj = new object(); private readonly ConcurrentDictionary<string, ConcurrentQueue<ScraperDataWrapper>> Queues = new ConcurrentDictionary<string, ConcurrentQueue<ScraperDataWrapper>>(); private readonly ConcurrentDictionary<string, string> Running = new ConcurrentDictionary<string, string>(160, 9999); private readonly ConcurrentDictionary<string, Action> SubscriptionOnEnd = new ConcurrentDictionary<string, Action>(); private ConcurrentDictionary<string, Task> _queueThreads = new ConcurrentDictionary<string, Task>(); public static FireHorseManager Instance { get { if (_instance == null) { lock (_syncRoot) { if (_instance == null) _instance = new FireHorseManager(); } } return _instance; } } public int MaxRunningElementsByDomain { get; set; } = 40; public int MaxRetryCount { get; set; } = 5; public int CurrentRunningSize => Running.Count; public IDictionary<string, int> CurrentRunningSizeByDomain => (from x in Running group x by x.Value into y select new { Domain = y.Key, Quantity = y.Count() }).ToDictionary(x => x.Domain, x => x.Quantity); public int CurrentQueueSize => (from x in Queues select x.Value.Count).Sum(); public int CurrentQueues => Queues.Count; public bool IsEnded => Queues.IsEmpty && Running.IsEmpty; public bool IsActive => _isRunning; private FireHorseManager() { _canRun = true; _isRunning = true; } public void Enqueue(ScraperData data) { if (string.IsNullOrWhiteSpace(data.Url)) throw new ArgumentException("URL is required."); if (data.OnDataArrived == null) throw new ArgumentException("OnDataArrived is required."); if (!Uri.TryCreate(data.Url, UriKind.RelativeOrAbsolute, out Uri result)) throw new ArgumentException("URL '{0}' is invalid", data.Url); string domain = result.Authority.ToLower(); lock (LockerObj) { if (Queues.Any((KeyValuePair<string, ConcurrentQueue<ScraperDataWrapper>> x) => x.Key == domain)) { ConcurrentQueue<ScraperDataWrapper> concurrentQueue = Queues[domain]; concurrentQueue.Enqueue(ScrapperMapper.ToWrapper(data, domain, result)); } else { ConcurrentQueue<ScraperDataWrapper> queue = new ConcurrentQueue<ScraperDataWrapper>(); queue.Enqueue(ScrapperMapper.ToWrapper(data, domain, result)); if (!Queues.TryAdd(domain, queue) && !Queues.Any((KeyValuePair<string, ConcurrentQueue<ScraperDataWrapper>> x) => x.Key == domain)) throw new Exception("Unexpected error when try to create a new Queue for domain " + domain); Task value = Task.Factory.StartNew(delegate { ConsumeFromQueue(domain, queue); }); if (!_queueThreads.TryAdd(domain, value) && !_queueThreads.Any((KeyValuePair<string, Task> x) => x.Key == domain)) throw new Exception("Unexpected error when try to add a task of queue on QueueThreads for domain " + domain); } } } public void Start() { _canRun = true; if (!_isRunning) { lock (LockerObj) { if (!_isRunning) { _isRunning = true; foreach (KeyValuePair<string, ConcurrentQueue<ScraperDataWrapper>> queue in Queues) { Task value = Task.Factory.StartNew(delegate { ConsumeFromQueue(queue.Key, queue.Value); }); _queueThreads.TryAdd(queue.Key, value); } } } } } public void Stop() { _canRun = false; while (_queueThreads.Any((KeyValuePair<string, Task> x) => x.Value.Status == TaskStatus.Running)) { Thread.Sleep(2000); } while (CurrentRunningSize > 0) { Thread.Sleep(2000); } _isRunning = false; _queueThreads = new ConcurrentDictionary<string, Task>(); } public string SubscribeToEndProcess(Action subscription) { if (subscription == null) throw new ArgumentNullException("subscription", "The subscription parameter is required. It cannot be null"); string text = Guid.NewGuid().ToString(); if (!SubscriptionOnEnd.TryAdd(text, subscription)) throw new Exception("Unable to add new subscription for end process"); return text; } public void UnsuscribeToEndProcess(string key) { if (string.IsNullOrWhiteSpace(key)) throw new ArgumentNullException("key", "The key parameter is requiered. It cannot be empty or null"); if (!SubscriptionOnEnd.Any((KeyValuePair<string, Action> x) => x.Key == key)) throw new ArgumentException("The key parameter does not exists in subscription list", "key"); if (!SubscriptionOnEnd.TryRemove(key, out Action _)) throw new Exception("Unable to remove the subscription for key " + key); } private async void ConsumeFromQueue(string domain, ConcurrentQueue<ScraperDataWrapper> queue) { int throtledCount = 0; int emptyQueueCount = 0; ScraperDataWrapper item; while (_canRun) { while (queue.TryDequeue(out item)) { emptyQueueCount = 0; ScraperDataWrapper itemSafeClosure = item; if (Running.Count((KeyValuePair<string, string> x) => x.Value == item.Domain) >= MaxRunningElementsByDomain) { throtledCount++; queue.Enqueue(itemSafeClosure); await Task.Delay(throtledCount * 50); break; } throtledCount = 0; string runningId = Guid.NewGuid().ToString(); if (Running.TryAdd(runningId, item.Domain)) Task.Factory.StartNew(delegate { GetDataFromWebServer(runningId, itemSafeClosure, 0); }); else queue.Enqueue(item); } if (queue.IsEmpty) { if (emptyQueueCount > MaxRetryCount) { OnConsumeFromQueueOut(domain); return; } emptyQueueCount++; await Task.Delay(3000); } } OnConsumeFromQueueOut(domain); } private void OnConsumeFromQueueOut(string domain) { Task.Factory.StartNew(delegate { lock (LockerObj) { if (Queues.Any((KeyValuePair<string, ConcurrentQueue<ScraperDataWrapper>> x) => x.Key == domain) && Queues[domain].IsEmpty && Queues.TryRemove(domain, out ConcurrentQueue<ScraperDataWrapper> _)) _queueThreads.TryRemove(domain, out Task _); } CheckIfProcessIsFinished(); }); } private void GetDataFromWebServer(string runningId, ScraperDataWrapper item, int retryCount = 0) { try { ScraperDataResponse response = ScrapperMapper.ToResponse(item); item.OnDequeue?.Invoke(response); switch (item.ScraperType) { case ScraperType.String: ProcessAsHtml(runningId, item, response, retryCount); break; case ScraperType.Binary: ProcessAsBinary(runningId, item, response, retryCount); break; default: throw new Exception("ScraperType " + item.ScraperType + " not valid"); } } catch (Exception ex) { ExceptionHandlerOnDownloadData(ex, item, runningId, retryCount); } } private void ProcessAsHtml(string runningId, ScraperDataWrapper wrapper, ScraperDataResponse response, int retryCount) { using (WebClient webClient = new WebClient()) { if (wrapper.Proxy != null) webClient.Proxy = wrapper.Proxy; webClient.DownloadStringCompleted += delegate(object sender, DownloadStringCompletedEventArgs args) { if (args.Error != null) ExceptionHandlerOnDownloadData(args.Error, wrapper, runningId, retryCount); else { RemoveItemFromRunningCollection(wrapper, runningId, 0); response.Response = args.Result; wrapper.OnDataArrived?.Invoke(response); CheckIfProcessIsFinished(); } }; webClient.DownloadStringAsync(wrapper.Uri); } } private void ProcessAsBinary(string runningId, ScraperDataWrapper wrapper, ScraperDataResponse response, int retryCount) { using (WebClient webClient = new WebClient()) { if (wrapper.Proxy != null) webClient.Proxy = wrapper.Proxy; webClient.DownloadDataCompleted += delegate(object sender, DownloadDataCompletedEventArgs args) { if (args.Error != null) ExceptionHandlerOnDownloadData(args.Error, wrapper, runningId, retryCount); else { RemoveItemFromRunningCollection(wrapper, runningId, 0); response.Response = args.Result; wrapper.OnDataArrived?.Invoke(response); CheckIfProcessIsFinished(); } }; webClient.DownloadDataAsync(wrapper.Uri); } } private void CheckIfProcessIsFinished() { if (Queues.IsEmpty && Running.IsEmpty) NotifyEndProcess(); } private void ExceptionHandlerOnDownloadData(Exception ex, ScraperDataWrapper item, string runningId, int retryCount) { ScraperDataResponse scraperDataResponse = ScrapperMapper.ToResponse(item); scraperDataResponse.Exception = ex; if (ex is WebException) { if (retryCount < MaxRetryCount) { Thread.Sleep(2000); GetDataFromWebServer(runningId, item, retryCount + 1); } else if (item.OnThrownException != null) { item.OnThrownException?.Invoke(scraperDataResponse); RemoveItemFromRunningCollection(item, runningId, 0); } } else if (item.OnThrownException != null) { item.OnThrownException?.Invoke(scraperDataResponse); RemoveItemFromRunningCollection(item, runningId, 0); } } private void RemoveItemFromRunningCollection(ScraperData item, string key, int retryCount = 0) { ScraperDataResponse scraperDataResponse = ScrapperMapper.ToResponse(item); if (!Running.TryRemove(key, out string _)) { if (retryCount < MaxRetryCount) RemoveItemFromRunningCollection(item, key, retryCount + 1); else { scraperDataResponse.Exception = new Exception("The scraper data response cannot be deleted from running collection."); item.OnThrownException?.Invoke(scraperDataResponse); } } } private void NotifyEndProcess() { foreach (KeyValuePair<string, Action> item in SubscriptionOnEnd) { item.Value(); } } } }