olivia_finder.myrequests.request_worker
1import queue 2import time 3from typing import Optional, Tuple 4import requests 5from threading import Thread 6import tqdm 7from .job import RequestJob 8from .proxy_handler import ProxyHandler 9from .useragent_handler import UserAgentHandler 10from ..utilities.logger import MyLogger 11from ..utilities.config import Configuration 12 13 14class RequestWorker(Thread): 15 ''' 16 RequestWorker class, inherits from Thread, so it can be run in parallel 17 This class is responsible for doing the requests and storing the results in the RequestJob objects 18 Use the run() method to start the worker 19 The worker has a queue of RequestJob objects, it will get the next job from the queue and do the request 20 The worker has access to the ProxyHandler and UserAgentHandler objects, so it can get a proxy and a user agent for each request 21 The worker will store the results in the RequestJob object and add it to the list of jobs that it has done 22 The worker will keep working till it receives an exit string from the queue 23 The exit string is a RequestJob object with key = RequestJob.FINALIZE_KEY 24 25 ''' 26 27 # Constants 28 RETRIES = 0 29 RETRY_DELAY = 0 30 TIMEOUT = 30 31 32 def __init__( 33 self, 34 worker_id: int, 35 jobs_queue: queue.Queue, 36 progress_bar: Optional[tqdm.tqdm] = None): 37 ''' 38 Constructor 39 40 Parameters 41 ---------- 42 worker_id : int 43 Id of the worker 44 jobs_queue : queue.Queue 45 Queue of RequestJob objects 46 progress_bar : tqdm.tqdm 47 Progress bar, if None, no progress bar will be shown 48 49 ''' 50 51 Thread.__init__(self) 52 self.worker_id = worker_id 53 self.jobs_queue = jobs_queue 54 self.my_jobs = [] 55 self.progress_bar = progress_bar 56 57 # Create handlers (Singletons) 58 self.proxy_handler = ProxyHandler() 59 self.user_agent_handler = UserAgentHandler() 60 self.stopped = False 61 62 # Get logger name from config file 63 self.logger = MyLogger.get_logger('logger_myrequests') 64 self.logger.debug(f"Creating RequestWorker object {self.worker_id}") 65 66 67 def run(self): 68 ''' 69 Run the worker thread till it receives an exit signal 70 ''' 71 72 while not self.stopped: 73 74 # Get next url from the queue 75 job = self.jobs_queue.get() 76 message = f"Worker {self.worker_id}: Got job from queue\n" + \ 77 f"Worker {self.worker_id}: {job}" + \ 78 f"Worker {self.worker_id}: Queue size: {self.jobs_queue.qsize()}" 79 80 self.logger.debug(message) 81 82 # If exit string is received, break the loop 83 if job.key == RequestJob.FINALIZE_KEY: 84 break 85 86 # Do the request 87 message = f"Worker {self.worker_id}: Doing request" 88 self.logger.debug(message) 89 try: 90 proxy, user_agent = self._obtain_request_args() 91 response = self._do_request(job.url, 92 proxy=proxy, 93 headers={"User-Agent": user_agent}, 94 params=job.params) 95 except Exception as e: 96 self.logger.error(f"Worker {self.worker_id}: Error doing request job: {e}") 97 response = None 98 99 # Check if the response is valid 100 if response is None or response.status_code != 200: 101 self.logger.error(f"Worker {self.worker_id}: Error doing request job: {response}") 102 response = None 103 104 # Set the response in the job and add it to the list of jobs 105 job.set_response(response) 106 self.my_jobs.append(job) 107 self.jobs_queue.task_done() 108 109 if self.progress_bar is not None: 110 self.progress_bar.update(1) 111 112 def _obtain_request_args(self) -> Tuple[str, str]: 113 ''' 114 Obtain the proxy and user agent to use for the request 115 116 Returns 117 ------- 118 tuple[str, str] 119 Tuple with the proxy and user agent to use for the request 120 ''' 121 122 # Get a proxy with the lock 123 self.proxy_handler.lock.acquire() 124 proxy = self.proxy_handler.get_next_proxy() 125 self.proxy_handler.lock.release() 126 127 # Get a user agent with the lock 128 self.user_agent_handler.lock.acquire() 129 user_agent = self.user_agent_handler.get_next_useragent() 130 self.user_agent_handler.lock.release() 131 132 return proxy, user_agent 133 134 def _do_request(self, 135 url: str, 136 timeout: int = TIMEOUT, 137 retries: int = RETRIES, 138 retry_delay: int = RETRY_DELAY, 139 data: Optional[dict] = None, 140 proxy: Optional[str] = None, 141 headers: Optional[dict] = None, 142 params: Optional[dict] = None) -> Optional[requests.Response]: 143 144 ''' 145 Do a request using requests library, with retries, the parameters are the same as requests.get 146 Handles the exceptions and retries 147 148 149 Parameters 150 ---------- 151 url : str 152 Url to do the request 153 timeout : int, optional 154 Timeout for the request, by default TIMEOUT 155 retries : int, optional 156 Number of retries, by default RETRIES 157 retry_delay : int, optional 158 Delay between retries, by default RETRY_DELAY 159 data : dict = None, optional 160 Data to send with the request, by default None 161 proxy : str = None, optional 162 Proxy to use for the request, by default None 163 headers : dict = None, optional 164 Headers to use for the request, by default None 165 params : dict = None, optional 166 Parameters to use for the request, by default None 167 168 Returns 169 ------- 170 requests.Response 171 Response of the request 172 173 ''' 174 175 # Do the request 176 if proxy is None: 177 curr_proxy = None 178 else: 179 curr_proxy = {"http": proxy} 180 181 try: 182 response = requests.get(url, 183 timeout=timeout, 184 headers=headers, 185 proxies=curr_proxy, 186 data=data, 187 params=params) 188 189 except Exception as e1: 190 191 # Retry if there are retries left 192 self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e1}") 193 response = None 194 while response is None and retries > 0: 195 retries -= 1 196 time.sleep(retry_delay) 197 self.logger.debug(f"Worker {self.worker_id}: Retrying {url}, Retries left: {retries}") 198 try: 199 response = requests.get(url, 200 timeout=timeout, 201 headers=headers, 202 proxies=curr_proxy, 203 data=data) 204 except Exception as e2: 205 self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e2}") 206 response = None 207 208 self.logger.debug(f"Worker {self.worker_id}: {url}, Response: {response}") 209 210 return response 211
15class RequestWorker(Thread): 16 ''' 17 RequestWorker class, inherits from Thread, so it can be run in parallel 18 This class is responsible for doing the requests and storing the results in the RequestJob objects 19 Use the run() method to start the worker 20 The worker has a queue of RequestJob objects, it will get the next job from the queue and do the request 21 The worker has access to the ProxyHandler and UserAgentHandler objects, so it can get a proxy and a user agent for each request 22 The worker will store the results in the RequestJob object and add it to the list of jobs that it has done 23 The worker will keep working till it receives an exit string from the queue 24 The exit string is a RequestJob object with key = RequestJob.FINALIZE_KEY 25 26 ''' 27 28 # Constants 29 RETRIES = 0 30 RETRY_DELAY = 0 31 TIMEOUT = 30 32 33 def __init__( 34 self, 35 worker_id: int, 36 jobs_queue: queue.Queue, 37 progress_bar: Optional[tqdm.tqdm] = None): 38 ''' 39 Constructor 40 41 Parameters 42 ---------- 43 worker_id : int 44 Id of the worker 45 jobs_queue : queue.Queue 46 Queue of RequestJob objects 47 progress_bar : tqdm.tqdm 48 Progress bar, if None, no progress bar will be shown 49 50 ''' 51 52 Thread.__init__(self) 53 self.worker_id = worker_id 54 self.jobs_queue = jobs_queue 55 self.my_jobs = [] 56 self.progress_bar = progress_bar 57 58 # Create handlers (Singletons) 59 self.proxy_handler = ProxyHandler() 60 self.user_agent_handler = UserAgentHandler() 61 self.stopped = False 62 63 # Get logger name from config file 64 self.logger = MyLogger.get_logger('logger_myrequests') 65 self.logger.debug(f"Creating RequestWorker object {self.worker_id}") 66 67 68 def run(self): 69 ''' 70 Run the worker thread till it receives an exit signal 71 ''' 72 73 while not self.stopped: 74 75 # Get next url from the queue 76 job = self.jobs_queue.get() 77 message = f"Worker {self.worker_id}: Got job from queue\n" + \ 78 f"Worker {self.worker_id}: {job}" + \ 79 f"Worker {self.worker_id}: Queue size: {self.jobs_queue.qsize()}" 80 81 self.logger.debug(message) 82 83 # If exit string is received, break the loop 84 if job.key == RequestJob.FINALIZE_KEY: 85 break 86 87 # Do the request 88 message = f"Worker {self.worker_id}: Doing request" 89 self.logger.debug(message) 90 try: 91 proxy, user_agent = self._obtain_request_args() 92 response = self._do_request(job.url, 93 proxy=proxy, 94 headers={"User-Agent": user_agent}, 95 params=job.params) 96 except Exception as e: 97 self.logger.error(f"Worker {self.worker_id}: Error doing request job: {e}") 98 response = None 99 100 # Check if the response is valid 101 if response is None or response.status_code != 200: 102 self.logger.error(f"Worker {self.worker_id}: Error doing request job: {response}") 103 response = None 104 105 # Set the response in the job and add it to the list of jobs 106 job.set_response(response) 107 self.my_jobs.append(job) 108 self.jobs_queue.task_done() 109 110 if self.progress_bar is not None: 111 self.progress_bar.update(1) 112 113 def _obtain_request_args(self) -> Tuple[str, str]: 114 ''' 115 Obtain the proxy and user agent to use for the request 116 117 Returns 118 ------- 119 tuple[str, str] 120 Tuple with the proxy and user agent to use for the request 121 ''' 122 123 # Get a proxy with the lock 124 self.proxy_handler.lock.acquire() 125 proxy = self.proxy_handler.get_next_proxy() 126 self.proxy_handler.lock.release() 127 128 # Get a user agent with the lock 129 self.user_agent_handler.lock.acquire() 130 user_agent = self.user_agent_handler.get_next_useragent() 131 self.user_agent_handler.lock.release() 132 133 return proxy, user_agent 134 135 def _do_request(self, 136 url: str, 137 timeout: int = TIMEOUT, 138 retries: int = RETRIES, 139 retry_delay: int = RETRY_DELAY, 140 data: Optional[dict] = None, 141 proxy: Optional[str] = None, 142 headers: Optional[dict] = None, 143 params: Optional[dict] = None) -> Optional[requests.Response]: 144 145 ''' 146 Do a request using requests library, with retries, the parameters are the same as requests.get 147 Handles the exceptions and retries 148 149 150 Parameters 151 ---------- 152 url : str 153 Url to do the request 154 timeout : int, optional 155 Timeout for the request, by default TIMEOUT 156 retries : int, optional 157 Number of retries, by default RETRIES 158 retry_delay : int, optional 159 Delay between retries, by default RETRY_DELAY 160 data : dict = None, optional 161 Data to send with the request, by default None 162 proxy : str = None, optional 163 Proxy to use for the request, by default None 164 headers : dict = None, optional 165 Headers to use for the request, by default None 166 params : dict = None, optional 167 Parameters to use for the request, by default None 168 169 Returns 170 ------- 171 requests.Response 172 Response of the request 173 174 ''' 175 176 # Do the request 177 if proxy is None: 178 curr_proxy = None 179 else: 180 curr_proxy = {"http": proxy} 181 182 try: 183 response = requests.get(url, 184 timeout=timeout, 185 headers=headers, 186 proxies=curr_proxy, 187 data=data, 188 params=params) 189 190 except Exception as e1: 191 192 # Retry if there are retries left 193 self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e1}") 194 response = None 195 while response is None and retries > 0: 196 retries -= 1 197 time.sleep(retry_delay) 198 self.logger.debug(f"Worker {self.worker_id}: Retrying {url}, Retries left: {retries}") 199 try: 200 response = requests.get(url, 201 timeout=timeout, 202 headers=headers, 203 proxies=curr_proxy, 204 data=data) 205 except Exception as e2: 206 self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e2}") 207 response = None 208 209 self.logger.debug(f"Worker {self.worker_id}: {url}, Response: {response}") 210 211 return response
RequestWorker class, inherits from Thread, so it can be run in parallel This class is responsible for doing the requests and storing the results in the RequestJob objects Use the run() method to start the worker The worker has a queue of RequestJob objects, it will get the next job from the queue and do the request The worker has access to the ProxyHandler and UserAgentHandler objects, so it can get a proxy and a user agent for each request The worker will store the results in the RequestJob object and add it to the list of jobs that it has done The worker will keep working till it receives an exit string from the queue The exit string is a RequestJob object with key = RequestJob.FINALIZE_KEY
33 def __init__( 34 self, 35 worker_id: int, 36 jobs_queue: queue.Queue, 37 progress_bar: Optional[tqdm.tqdm] = None): 38 ''' 39 Constructor 40 41 Parameters 42 ---------- 43 worker_id : int 44 Id of the worker 45 jobs_queue : queue.Queue 46 Queue of RequestJob objects 47 progress_bar : tqdm.tqdm 48 Progress bar, if None, no progress bar will be shown 49 50 ''' 51 52 Thread.__init__(self) 53 self.worker_id = worker_id 54 self.jobs_queue = jobs_queue 55 self.my_jobs = [] 56 self.progress_bar = progress_bar 57 58 # Create handlers (Singletons) 59 self.proxy_handler = ProxyHandler() 60 self.user_agent_handler = UserAgentHandler() 61 self.stopped = False 62 63 # Get logger name from config file 64 self.logger = MyLogger.get_logger('logger_myrequests') 65 self.logger.debug(f"Creating RequestWorker object {self.worker_id}")
Constructor
Parameters
- worker_id (int): Id of the worker
- jobs_queue (queue.Queue): Queue of RequestJob objects
- progress_bar (tqdm.tqdm): Progress bar, if None, no progress bar will be shown
68 def run(self): 69 ''' 70 Run the worker thread till it receives an exit signal 71 ''' 72 73 while not self.stopped: 74 75 # Get next url from the queue 76 job = self.jobs_queue.get() 77 message = f"Worker {self.worker_id}: Got job from queue\n" + \ 78 f"Worker {self.worker_id}: {job}" + \ 79 f"Worker {self.worker_id}: Queue size: {self.jobs_queue.qsize()}" 80 81 self.logger.debug(message) 82 83 # If exit string is received, break the loop 84 if job.key == RequestJob.FINALIZE_KEY: 85 break 86 87 # Do the request 88 message = f"Worker {self.worker_id}: Doing request" 89 self.logger.debug(message) 90 try: 91 proxy, user_agent = self._obtain_request_args() 92 response = self._do_request(job.url, 93 proxy=proxy, 94 headers={"User-Agent": user_agent}, 95 params=job.params) 96 except Exception as e: 97 self.logger.error(f"Worker {self.worker_id}: Error doing request job: {e}") 98 response = None 99 100 # Check if the response is valid 101 if response is None or response.status_code != 200: 102 self.logger.error(f"Worker {self.worker_id}: Error doing request job: {response}") 103 response = None 104 105 # Set the response in the job and add it to the list of jobs 106 job.set_response(response) 107 self.my_jobs.append(job) 108 self.jobs_queue.task_done() 109 110 if self.progress_bar is not None: 111 self.progress_bar.update(1)
Run the worker thread till it receives an exit signal
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id