olivia_finder.myrequests.request_worker

  1import queue
  2import time
  3from typing import Optional, Tuple
  4import requests
  5from threading import Thread
  6import tqdm
  7from .job import RequestJob
  8from .proxy_handler import ProxyHandler
  9from .useragent_handler import UserAgentHandler
 10from ..utilities.logger import MyLogger
 11from ..utilities.config import Configuration
 12
 13
 14class RequestWorker(Thread):
 15    '''
 16    RequestWorker class, inherits from Thread, so it can be run in parallel
 17    This class is responsible for doing the requests and storing the results in the RequestJob objects
 18    Use the run() method to start the worker
 19    The worker has a queue of RequestJob objects, it will get the next job from the queue and do the request
 20    The worker has access to the ProxyHandler and UserAgentHandler objects, so it can get a proxy and a user agent for each request
 21    The worker will store the results in the RequestJob object and add it to the list of jobs that it has done
 22    The worker will keep working till it receives an exit string from the queue
 23    The exit string is a RequestJob object with key = RequestJob.FINALIZE_KEY
 24
 25    '''
 26
 27    # Constants
 28    RETRIES = 0
 29    RETRY_DELAY = 0
 30    TIMEOUT = 30
 31    
 32    def __init__(
 33            self, 
 34            worker_id: int, 
 35            jobs_queue: queue.Queue, 
 36            progress_bar: Optional[tqdm.tqdm] = None):
 37        '''
 38        Constructor
 39
 40        Parameters
 41        ----------
 42        worker_id : int
 43            Id of the worker
 44        jobs_queue : queue.Queue
 45            Queue of RequestJob objects
 46        progress_bar : tqdm.tqdm
 47            Progress bar, if None, no progress bar will be shown
 48
 49        '''
 50
 51        Thread.__init__(self)
 52        self.worker_id = worker_id
 53        self.jobs_queue = jobs_queue
 54        self.my_jobs = []
 55        self.progress_bar = progress_bar
 56        
 57        # Create handlers (Singletons)
 58        self.proxy_handler = ProxyHandler()
 59        self.user_agent_handler = UserAgentHandler()
 60        self.stopped = False
 61
 62        # Get logger name from config file
 63        self.logger = MyLogger.get_logger('logger_myrequests')
 64        self.logger.debug(f"Creating RequestWorker object {self.worker_id}")
 65
 66
 67    def run(self):
 68        '''
 69        Run the worker thread till it receives an exit signal
 70        '''
 71
 72        while not self.stopped:
 73
 74            # Get next url from the queue
 75            job = self.jobs_queue.get()
 76            message =   f"Worker {self.worker_id}: Got job from queue\n" + \
 77                        f"Worker {self.worker_id}: {job}" + \
 78                        f"Worker {self.worker_id}: Queue size: {self.jobs_queue.qsize()}"
 79
 80            self.logger.debug(message)
 81
 82            # If exit string is received, break the loop
 83            if job.key == RequestJob.FINALIZE_KEY:
 84                break
 85
 86            # Do the request
 87            message = f"Worker {self.worker_id}: Doing request"
 88            self.logger.debug(message)
 89            try:
 90                proxy, user_agent = self._obtain_request_args()
 91                response = self._do_request(job.url, 
 92                                            proxy=proxy, 
 93                                            headers={"User-Agent": user_agent}, 
 94                                            params=job.params)
 95            except Exception as e:
 96                self.logger.error(f"Worker {self.worker_id}: Error doing request job: {e}")
 97                response = None
 98
 99            # Check if the response is valid
100            if response is None or response.status_code != 200:
101                self.logger.error(f"Worker {self.worker_id}: Error doing request job: {response}")
102                response = None
103
104            # Set the response in the job and add it to the list of jobs
105            job.set_response(response)
106            self.my_jobs.append(job)
107            self.jobs_queue.task_done()
108
109            if self.progress_bar is not None:
110                self.progress_bar.update(1)
111            
112    def _obtain_request_args(self) -> Tuple[str, str]:
113        '''
114        Obtain the proxy and user agent to use for the request
115
116        Returns
117        -------
118        tuple[str, str]
119            Tuple with the proxy and user agent to use for the request
120        '''
121
122        # Get a proxy with the lock
123        self.proxy_handler.lock.acquire()
124        proxy = self.proxy_handler.get_next_proxy()
125        self.proxy_handler.lock.release()
126        
127        # Get a user agent with the lock
128        self.user_agent_handler.lock.acquire()
129        user_agent = self.user_agent_handler.get_next_useragent()
130        self.user_agent_handler.lock.release()
131        
132        return proxy, user_agent
133        
134    def _do_request(self, 
135                    url: str,
136                    timeout: int = TIMEOUT,
137                    retries: int = RETRIES,
138                    retry_delay: int = RETRY_DELAY,
139                    data: Optional[dict] = None,
140                    proxy: Optional[str] = None,
141                    headers: Optional[dict] = None,
142                    params: Optional[dict] = None) -> Optional[requests.Response]:
143
144        '''
145        Do a request using requests library, with retries, the parameters are the same as requests.get
146        Handles the exceptions and retries
147
148
149        Parameters
150        ----------
151        url : str
152            Url to do the request
153        timeout : int, optional
154            Timeout for the request, by default TIMEOUT
155        retries : int, optional
156            Number of retries, by default RETRIES
157        retry_delay : int, optional
158            Delay between retries, by default RETRY_DELAY
159        data : dict = None, optional
160            Data to send with the request, by default None
161        proxy : str = None, optional
162            Proxy to use for the request, by default None
163        headers : dict = None, optional
164            Headers to use for the request, by default None
165        params : dict = None, optional
166            Parameters to use for the request, by default None
167
168        Returns
169        -------
170        requests.Response
171            Response of the request
172
173        '''
174        
175        # Do the request
176        if proxy is None:
177            curr_proxy = None
178        else:
179            curr_proxy = {"http": proxy}
180
181        try:
182            response = requests.get(url, 
183                                    timeout=timeout, 
184                                    headers=headers,
185                                    proxies=curr_proxy,
186                                    data=data,
187                                    params=params)
188
189        except Exception as e1:
190
191            # Retry if there are retries left
192            self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e1}")
193            response = None
194            while response is None and retries > 0:
195                retries -= 1
196                time.sleep(retry_delay)
197                self.logger.debug(f"Worker {self.worker_id}: Retrying {url}, Retries left: {retries}")
198                try:
199                    response = requests.get(url, 
200                                            timeout=timeout, 
201                                            headers=headers,
202                                            proxies=curr_proxy,
203                                            data=data)
204                except Exception as e2:
205                    self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e2}")
206                    response = None
207
208        self.logger.debug(f"Worker {self.worker_id}: {url}, Response: {response}")
209            
210        return response
211    
class RequestWorker(threading.Thread):
 15class RequestWorker(Thread):
 16    '''
 17    RequestWorker class, inherits from Thread, so it can be run in parallel
 18    This class is responsible for doing the requests and storing the results in the RequestJob objects
 19    Use the run() method to start the worker
 20    The worker has a queue of RequestJob objects, it will get the next job from the queue and do the request
 21    The worker has access to the ProxyHandler and UserAgentHandler objects, so it can get a proxy and a user agent for each request
 22    The worker will store the results in the RequestJob object and add it to the list of jobs that it has done
 23    The worker will keep working till it receives an exit string from the queue
 24    The exit string is a RequestJob object with key = RequestJob.FINALIZE_KEY
 25
 26    '''
 27
 28    # Constants
 29    RETRIES = 0
 30    RETRY_DELAY = 0
 31    TIMEOUT = 30
 32    
 33    def __init__(
 34            self, 
 35            worker_id: int, 
 36            jobs_queue: queue.Queue, 
 37            progress_bar: Optional[tqdm.tqdm] = None):
 38        '''
 39        Constructor
 40
 41        Parameters
 42        ----------
 43        worker_id : int
 44            Id of the worker
 45        jobs_queue : queue.Queue
 46            Queue of RequestJob objects
 47        progress_bar : tqdm.tqdm
 48            Progress bar, if None, no progress bar will be shown
 49
 50        '''
 51
 52        Thread.__init__(self)
 53        self.worker_id = worker_id
 54        self.jobs_queue = jobs_queue
 55        self.my_jobs = []
 56        self.progress_bar = progress_bar
 57        
 58        # Create handlers (Singletons)
 59        self.proxy_handler = ProxyHandler()
 60        self.user_agent_handler = UserAgentHandler()
 61        self.stopped = False
 62
 63        # Get logger name from config file
 64        self.logger = MyLogger.get_logger('logger_myrequests')
 65        self.logger.debug(f"Creating RequestWorker object {self.worker_id}")
 66
 67
 68    def run(self):
 69        '''
 70        Run the worker thread till it receives an exit signal
 71        '''
 72
 73        while not self.stopped:
 74
 75            # Get next url from the queue
 76            job = self.jobs_queue.get()
 77            message =   f"Worker {self.worker_id}: Got job from queue\n" + \
 78                        f"Worker {self.worker_id}: {job}" + \
 79                        f"Worker {self.worker_id}: Queue size: {self.jobs_queue.qsize()}"
 80
 81            self.logger.debug(message)
 82
 83            # If exit string is received, break the loop
 84            if job.key == RequestJob.FINALIZE_KEY:
 85                break
 86
 87            # Do the request
 88            message = f"Worker {self.worker_id}: Doing request"
 89            self.logger.debug(message)
 90            try:
 91                proxy, user_agent = self._obtain_request_args()
 92                response = self._do_request(job.url, 
 93                                            proxy=proxy, 
 94                                            headers={"User-Agent": user_agent}, 
 95                                            params=job.params)
 96            except Exception as e:
 97                self.logger.error(f"Worker {self.worker_id}: Error doing request job: {e}")
 98                response = None
 99
100            # Check if the response is valid
101            if response is None or response.status_code != 200:
102                self.logger.error(f"Worker {self.worker_id}: Error doing request job: {response}")
103                response = None
104
105            # Set the response in the job and add it to the list of jobs
106            job.set_response(response)
107            self.my_jobs.append(job)
108            self.jobs_queue.task_done()
109
110            if self.progress_bar is not None:
111                self.progress_bar.update(1)
112            
113    def _obtain_request_args(self) -> Tuple[str, str]:
114        '''
115        Obtain the proxy and user agent to use for the request
116
117        Returns
118        -------
119        tuple[str, str]
120            Tuple with the proxy and user agent to use for the request
121        '''
122
123        # Get a proxy with the lock
124        self.proxy_handler.lock.acquire()
125        proxy = self.proxy_handler.get_next_proxy()
126        self.proxy_handler.lock.release()
127        
128        # Get a user agent with the lock
129        self.user_agent_handler.lock.acquire()
130        user_agent = self.user_agent_handler.get_next_useragent()
131        self.user_agent_handler.lock.release()
132        
133        return proxy, user_agent
134        
135    def _do_request(self, 
136                    url: str,
137                    timeout: int = TIMEOUT,
138                    retries: int = RETRIES,
139                    retry_delay: int = RETRY_DELAY,
140                    data: Optional[dict] = None,
141                    proxy: Optional[str] = None,
142                    headers: Optional[dict] = None,
143                    params: Optional[dict] = None) -> Optional[requests.Response]:
144
145        '''
146        Do a request using requests library, with retries, the parameters are the same as requests.get
147        Handles the exceptions and retries
148
149
150        Parameters
151        ----------
152        url : str
153            Url to do the request
154        timeout : int, optional
155            Timeout for the request, by default TIMEOUT
156        retries : int, optional
157            Number of retries, by default RETRIES
158        retry_delay : int, optional
159            Delay between retries, by default RETRY_DELAY
160        data : dict = None, optional
161            Data to send with the request, by default None
162        proxy : str = None, optional
163            Proxy to use for the request, by default None
164        headers : dict = None, optional
165            Headers to use for the request, by default None
166        params : dict = None, optional
167            Parameters to use for the request, by default None
168
169        Returns
170        -------
171        requests.Response
172            Response of the request
173
174        '''
175        
176        # Do the request
177        if proxy is None:
178            curr_proxy = None
179        else:
180            curr_proxy = {"http": proxy}
181
182        try:
183            response = requests.get(url, 
184                                    timeout=timeout, 
185                                    headers=headers,
186                                    proxies=curr_proxy,
187                                    data=data,
188                                    params=params)
189
190        except Exception as e1:
191
192            # Retry if there are retries left
193            self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e1}")
194            response = None
195            while response is None and retries > 0:
196                retries -= 1
197                time.sleep(retry_delay)
198                self.logger.debug(f"Worker {self.worker_id}: Retrying {url}, Retries left: {retries}")
199                try:
200                    response = requests.get(url, 
201                                            timeout=timeout, 
202                                            headers=headers,
203                                            proxies=curr_proxy,
204                                            data=data)
205                except Exception as e2:
206                    self.logger.error(f"Worker {self.worker_id}: {url}, Exception: {e2}")
207                    response = None
208
209        self.logger.debug(f"Worker {self.worker_id}: {url}, Response: {response}")
210            
211        return response

RequestWorker class, inherits from Thread, so it can be run in parallel This class is responsible for doing the requests and storing the results in the RequestJob objects Use the run() method to start the worker The worker has a queue of RequestJob objects, it will get the next job from the queue and do the request The worker has access to the ProxyHandler and UserAgentHandler objects, so it can get a proxy and a user agent for each request The worker will store the results in the RequestJob object and add it to the list of jobs that it has done The worker will keep working till it receives an exit string from the queue The exit string is a RequestJob object with key = RequestJob.FINALIZE_KEY

RequestWorker( worker_id: int, jobs_queue: queue.Queue, progress_bar: Optional[tqdm.std.tqdm] = None)
33    def __init__(
34            self, 
35            worker_id: int, 
36            jobs_queue: queue.Queue, 
37            progress_bar: Optional[tqdm.tqdm] = None):
38        '''
39        Constructor
40
41        Parameters
42        ----------
43        worker_id : int
44            Id of the worker
45        jobs_queue : queue.Queue
46            Queue of RequestJob objects
47        progress_bar : tqdm.tqdm
48            Progress bar, if None, no progress bar will be shown
49
50        '''
51
52        Thread.__init__(self)
53        self.worker_id = worker_id
54        self.jobs_queue = jobs_queue
55        self.my_jobs = []
56        self.progress_bar = progress_bar
57        
58        # Create handlers (Singletons)
59        self.proxy_handler = ProxyHandler()
60        self.user_agent_handler = UserAgentHandler()
61        self.stopped = False
62
63        # Get logger name from config file
64        self.logger = MyLogger.get_logger('logger_myrequests')
65        self.logger.debug(f"Creating RequestWorker object {self.worker_id}")

Constructor

Parameters
  • worker_id (int): Id of the worker
  • jobs_queue (queue.Queue): Queue of RequestJob objects
  • progress_bar (tqdm.tqdm): Progress bar, if None, no progress bar will be shown
def run(self):
 68    def run(self):
 69        '''
 70        Run the worker thread till it receives an exit signal
 71        '''
 72
 73        while not self.stopped:
 74
 75            # Get next url from the queue
 76            job = self.jobs_queue.get()
 77            message =   f"Worker {self.worker_id}: Got job from queue\n" + \
 78                        f"Worker {self.worker_id}: {job}" + \
 79                        f"Worker {self.worker_id}: Queue size: {self.jobs_queue.qsize()}"
 80
 81            self.logger.debug(message)
 82
 83            # If exit string is received, break the loop
 84            if job.key == RequestJob.FINALIZE_KEY:
 85                break
 86
 87            # Do the request
 88            message = f"Worker {self.worker_id}: Doing request"
 89            self.logger.debug(message)
 90            try:
 91                proxy, user_agent = self._obtain_request_args()
 92                response = self._do_request(job.url, 
 93                                            proxy=proxy, 
 94                                            headers={"User-Agent": user_agent}, 
 95                                            params=job.params)
 96            except Exception as e:
 97                self.logger.error(f"Worker {self.worker_id}: Error doing request job: {e}")
 98                response = None
 99
100            # Check if the response is valid
101            if response is None or response.status_code != 200:
102                self.logger.error(f"Worker {self.worker_id}: Error doing request job: {response}")
103                response = None
104
105            # Set the response in the job and add it to the list of jobs
106            job.set_response(response)
107            self.my_jobs.append(job)
108            self.jobs_queue.task_done()
109
110            if self.progress_bar is not None:
111                self.progress_bar.update(1)

Run the worker thread till it receives an exit signal

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id