olivia_finder.myrequests.request_handler

  1import queue
  2import tqdm
  3import gc
  4from typing import List, Optional
  5from .job import RequestJob
  6from .request_worker import RequestWorker
  7from ..utilities.logger import MyLogger
  8from ..utilities.config import Configuration
  9
 10class RequestHandler:
 11    '''
 12    Main class of the myrequests package. It handles the requests to the server using concurrent workers.
 13
 14    '''
 15
 16    PARALLEL_WORKERS = 8
 17
 18    def __init__(self):
 19        '''
 20        Constructor for RequestHandler class
 21        '''
 22
 23        # Get logger name from config file
 24        # self.logger = MyLogger.get_logger(Configuration().get_key('logger_myrequests', 'name'))
 25        self.logger = MyLogger.get_logger('logger_myrequests')
 26        self.logger.debug("Creating RequestHandler object")
 27
 28        # Initialize 
 29        self.jobs_queue = queue.Queue()
 30        self.num_workers = 1
 31        self.workers: List[RequestWorker] = []
 32
 33    def _clear(self):
 34        '''
 35        Reset the RequestHandler object
 36        '''
 37        
 38        # Delete the objects
 39        del self.jobs_queue
 40        del self.workers
 41        gc.collect()
 42
 43        # Initialize        
 44        self.jobs_queue = queue.Queue()        
 45        self.num_workers = 1
 46        self.workers: List[RequestWorker] = []
 47
 48    def _setup_jobs(
 49            self, 
 50            request_jobs: List[RequestJob], 
 51            num_workers: int, 
 52            progress_bar: Optional[tqdm.tqdm] = None) -> None:
 53        '''
 54        Setup the jobs
 55
 56        Parameters
 57        ----------
 58        request_jobs : List[RequestJob]
 59            List of RequestJob objects
 60        num_workers : int
 61            Number of workers
 62        progress_bar : Optional[tqdm.tqdm], optional
 63            Progress bar, by default None
 64        '''
 65        
 66        # Enqueue jobs
 67        for job in request_jobs:
 68            # job.progress_bar = progress_bar
 69            self.jobs_queue.put(job)
 70
 71        self.logger.debug(f"Jobs queue size: {self.jobs_queue.qsize()}")
 72
 73        # Workers keep working till they receive an exit string
 74        # So we need to add the number of workers times the exit string to the queue
 75        self.num_workers = num_workers
 76        for _ in range(self.num_workers):
 77            self.jobs_queue.put(RequestJob.end_job_signal())
 78
 79        # Create workers and add to the queue
 80        self.logger.debug("Creating workers")
 81
 82        for i in range(self.num_workers):
 83            worker = RequestWorker(i, self.jobs_queue, progress_bar)
 84            self.workers.append(worker)
 85
 86        self.logger.debug(f"Created workers: {len(self.workers)}")
 87
 88    def _setup_job(self, request_job: RequestJob):
 89        '''
 90        Setup a single job
 91
 92        Parameters
 93        ----------
 94        request_job : RequestJob
 95            The RequestJob object
 96        '''
 97        
 98        # Enqueue jobs
 99        self.jobs_queue.put(request_job)
100
101        # Workers keep working till they receive an exit string
102        # So we need to add the number of workers times the exit string to the queue
103        self.jobs_queue.put(RequestJob.end_job_signal())
104
105        # Create workers and add to the queue
106        self.workers.append(
107            RequestWorker(0, self.jobs_queue)
108        )
109        self.logger.debug("Job created")
110
111    def do_requests(
112            self, 
113            request_jobs: List[RequestJob], 
114            num_workers: int = PARALLEL_WORKERS, 
115            progress_bar: Optional[tqdm.tqdm] = None) -> List[RequestJob]:
116        '''
117        Do the requests
118
119        Parameters
120        ----------
121        request_jobs : List[RequestJob]
122            List of RequestJob objects
123        num_workers : int, optional
124            Number of workers, by default PARALLEL_WORKERS
125        progress_bar : Optional[tqdm.tqdm], optional
126            Progress bar, by default None
127        
128        Returns
129        -------
130        list
131            List of RequestJob objects
132
133        Examples
134        --------
135        >>> rh = RequestHandler(jobs)
136        >>> results = rh.do_requests()
137        >>> for job in results:
138        >>>     print(f'key: {job.key}, url: {job.url}, response: {job.response}')
139        '''
140
141        # Clear the RequestHandler object
142        self._clear()
143
144        self.logger.info("Starting requests")
145        self.logger.debug(f"Number of jobs: {len(request_jobs)}")
146
147        # Setup jobs
148        self._setup_jobs(request_jobs, num_workers, progress_bar)
149        
150        # Start workers
151        for worker in self.workers:
152            self.logger.debug(f"Starting worker {worker.worker_id}")
153            worker.start()
154            
155        # Join workers to wait till they finished
156        for worker in self.workers:
157
158            worker.join()
159            self.logger.debug(f"Joining worker {worker.worker_id}")
160
161        # Combine results from all workers
162        workers_finalized_jobs = []
163        for worker in self.workers:
164            self.logger.debug(f"Worker {worker.worker_id} finished")
165            workers_finalized_jobs.extend(worker.my_jobs.copy())
166
167        self.logger.info("All requests finished")
168
169        # Clear memory
170        self._clear()
171
172        return workers_finalized_jobs
173
174    def do_request(self, job: RequestJob):
175        '''
176        Do a single request
177        
178        Parameters
179        ----------
180        job : RequestJob
181            The RequestJob object
182
183        Returns
184        -------
185        RequestJob
186            The RequestJob object with the response
187
188        Examples
189        --------
190        >>> rh = RequestHandler()
191        >>> job = RequestJob("single_job", "https://www.google.com")
192        >>> result = rh.do_request(job)
193        >>> print(f'key: {result.key}, url: {result.url}, response: {result.response}')
194        '''
195
196        # Clear the RequestHandler object
197        self._clear()
198
199        # Setup job
200        self._setup_job(job)
201
202        # Start worker
203        worker = RequestWorker(0, self.jobs_queue)
204        self.logger.debug(f"Starting worker {worker.worker_id}")
205
206        self.logger.info(f"Starting request for {job.key}: {job.url}")
207        worker.start()
208
209        # Join worker to wait till it finished
210        worker.join()
211        self.logger.debug(f"Joining worker {worker.worker_id}")
212        self.logger.info(f"Request for {job.key}: {job.url} finished")
213
214        if worker.my_jobs[0].response is None:
215            self.logger.info(f"Request for {job.key}: {job.url} failed: response is None")
216
217        # Return the job
218        return worker.my_jobs[0]
219    
class RequestHandler:
 11class RequestHandler:
 12    '''
 13    Main class of the myrequests package. It handles the requests to the server using concurrent workers.
 14
 15    '''
 16
 17    PARALLEL_WORKERS = 8
 18
 19    def __init__(self):
 20        '''
 21        Constructor for RequestHandler class
 22        '''
 23
 24        # Get logger name from config file
 25        # self.logger = MyLogger.get_logger(Configuration().get_key('logger_myrequests', 'name'))
 26        self.logger = MyLogger.get_logger('logger_myrequests')
 27        self.logger.debug("Creating RequestHandler object")
 28
 29        # Initialize 
 30        self.jobs_queue = queue.Queue()
 31        self.num_workers = 1
 32        self.workers: List[RequestWorker] = []
 33
 34    def _clear(self):
 35        '''
 36        Reset the RequestHandler object
 37        '''
 38        
 39        # Delete the objects
 40        del self.jobs_queue
 41        del self.workers
 42        gc.collect()
 43
 44        # Initialize        
 45        self.jobs_queue = queue.Queue()        
 46        self.num_workers = 1
 47        self.workers: List[RequestWorker] = []
 48
 49    def _setup_jobs(
 50            self, 
 51            request_jobs: List[RequestJob], 
 52            num_workers: int, 
 53            progress_bar: Optional[tqdm.tqdm] = None) -> None:
 54        '''
 55        Setup the jobs
 56
 57        Parameters
 58        ----------
 59        request_jobs : List[RequestJob]
 60            List of RequestJob objects
 61        num_workers : int
 62            Number of workers
 63        progress_bar : Optional[tqdm.tqdm], optional
 64            Progress bar, by default None
 65        '''
 66        
 67        # Enqueue jobs
 68        for job in request_jobs:
 69            # job.progress_bar = progress_bar
 70            self.jobs_queue.put(job)
 71
 72        self.logger.debug(f"Jobs queue size: {self.jobs_queue.qsize()}")
 73
 74        # Workers keep working till they receive an exit string
 75        # So we need to add the number of workers times the exit string to the queue
 76        self.num_workers = num_workers
 77        for _ in range(self.num_workers):
 78            self.jobs_queue.put(RequestJob.end_job_signal())
 79
 80        # Create workers and add to the queue
 81        self.logger.debug("Creating workers")
 82
 83        for i in range(self.num_workers):
 84            worker = RequestWorker(i, self.jobs_queue, progress_bar)
 85            self.workers.append(worker)
 86
 87        self.logger.debug(f"Created workers: {len(self.workers)}")
 88
 89    def _setup_job(self, request_job: RequestJob):
 90        '''
 91        Setup a single job
 92
 93        Parameters
 94        ----------
 95        request_job : RequestJob
 96            The RequestJob object
 97        '''
 98        
 99        # Enqueue jobs
100        self.jobs_queue.put(request_job)
101
102        # Workers keep working till they receive an exit string
103        # So we need to add the number of workers times the exit string to the queue
104        self.jobs_queue.put(RequestJob.end_job_signal())
105
106        # Create workers and add to the queue
107        self.workers.append(
108            RequestWorker(0, self.jobs_queue)
109        )
110        self.logger.debug("Job created")
111
112    def do_requests(
113            self, 
114            request_jobs: List[RequestJob], 
115            num_workers: int = PARALLEL_WORKERS, 
116            progress_bar: Optional[tqdm.tqdm] = None) -> List[RequestJob]:
117        '''
118        Do the requests
119
120        Parameters
121        ----------
122        request_jobs : List[RequestJob]
123            List of RequestJob objects
124        num_workers : int, optional
125            Number of workers, by default PARALLEL_WORKERS
126        progress_bar : Optional[tqdm.tqdm], optional
127            Progress bar, by default None
128        
129        Returns
130        -------
131        list
132            List of RequestJob objects
133
134        Examples
135        --------
136        >>> rh = RequestHandler(jobs)
137        >>> results = rh.do_requests()
138        >>> for job in results:
139        >>>     print(f'key: {job.key}, url: {job.url}, response: {job.response}')
140        '''
141
142        # Clear the RequestHandler object
143        self._clear()
144
145        self.logger.info("Starting requests")
146        self.logger.debug(f"Number of jobs: {len(request_jobs)}")
147
148        # Setup jobs
149        self._setup_jobs(request_jobs, num_workers, progress_bar)
150        
151        # Start workers
152        for worker in self.workers:
153            self.logger.debug(f"Starting worker {worker.worker_id}")
154            worker.start()
155            
156        # Join workers to wait till they finished
157        for worker in self.workers:
158
159            worker.join()
160            self.logger.debug(f"Joining worker {worker.worker_id}")
161
162        # Combine results from all workers
163        workers_finalized_jobs = []
164        for worker in self.workers:
165            self.logger.debug(f"Worker {worker.worker_id} finished")
166            workers_finalized_jobs.extend(worker.my_jobs.copy())
167
168        self.logger.info("All requests finished")
169
170        # Clear memory
171        self._clear()
172
173        return workers_finalized_jobs
174
175    def do_request(self, job: RequestJob):
176        '''
177        Do a single request
178        
179        Parameters
180        ----------
181        job : RequestJob
182            The RequestJob object
183
184        Returns
185        -------
186        RequestJob
187            The RequestJob object with the response
188
189        Examples
190        --------
191        >>> rh = RequestHandler()
192        >>> job = RequestJob("single_job", "https://www.google.com")
193        >>> result = rh.do_request(job)
194        >>> print(f'key: {result.key}, url: {result.url}, response: {result.response}')
195        '''
196
197        # Clear the RequestHandler object
198        self._clear()
199
200        # Setup job
201        self._setup_job(job)
202
203        # Start worker
204        worker = RequestWorker(0, self.jobs_queue)
205        self.logger.debug(f"Starting worker {worker.worker_id}")
206
207        self.logger.info(f"Starting request for {job.key}: {job.url}")
208        worker.start()
209
210        # Join worker to wait till it finished
211        worker.join()
212        self.logger.debug(f"Joining worker {worker.worker_id}")
213        self.logger.info(f"Request for {job.key}: {job.url} finished")
214
215        if worker.my_jobs[0].response is None:
216            self.logger.info(f"Request for {job.key}: {job.url} failed: response is None")
217
218        # Return the job
219        return worker.my_jobs[0]

Main class of the myrequests package. It handles the requests to the server using concurrent workers.

RequestHandler()
19    def __init__(self):
20        '''
21        Constructor for RequestHandler class
22        '''
23
24        # Get logger name from config file
25        # self.logger = MyLogger.get_logger(Configuration().get_key('logger_myrequests', 'name'))
26        self.logger = MyLogger.get_logger('logger_myrequests')
27        self.logger.debug("Creating RequestHandler object")
28
29        # Initialize 
30        self.jobs_queue = queue.Queue()
31        self.num_workers = 1
32        self.workers: List[RequestWorker] = []

Constructor for RequestHandler class

def do_requests( self, request_jobs: List[olivia_finder.myrequests.job.RequestJob], num_workers: int = 8, progress_bar: Optional[tqdm.std.tqdm] = None) -> List[olivia_finder.myrequests.job.RequestJob]:
112    def do_requests(
113            self, 
114            request_jobs: List[RequestJob], 
115            num_workers: int = PARALLEL_WORKERS, 
116            progress_bar: Optional[tqdm.tqdm] = None) -> List[RequestJob]:
117        '''
118        Do the requests
119
120        Parameters
121        ----------
122        request_jobs : List[RequestJob]
123            List of RequestJob objects
124        num_workers : int, optional
125            Number of workers, by default PARALLEL_WORKERS
126        progress_bar : Optional[tqdm.tqdm], optional
127            Progress bar, by default None
128        
129        Returns
130        -------
131        list
132            List of RequestJob objects
133
134        Examples
135        --------
136        >>> rh = RequestHandler(jobs)
137        >>> results = rh.do_requests()
138        >>> for job in results:
139        >>>     print(f'key: {job.key}, url: {job.url}, response: {job.response}')
140        '''
141
142        # Clear the RequestHandler object
143        self._clear()
144
145        self.logger.info("Starting requests")
146        self.logger.debug(f"Number of jobs: {len(request_jobs)}")
147
148        # Setup jobs
149        self._setup_jobs(request_jobs, num_workers, progress_bar)
150        
151        # Start workers
152        for worker in self.workers:
153            self.logger.debug(f"Starting worker {worker.worker_id}")
154            worker.start()
155            
156        # Join workers to wait till they finished
157        for worker in self.workers:
158
159            worker.join()
160            self.logger.debug(f"Joining worker {worker.worker_id}")
161
162        # Combine results from all workers
163        workers_finalized_jobs = []
164        for worker in self.workers:
165            self.logger.debug(f"Worker {worker.worker_id} finished")
166            workers_finalized_jobs.extend(worker.my_jobs.copy())
167
168        self.logger.info("All requests finished")
169
170        # Clear memory
171        self._clear()
172
173        return workers_finalized_jobs

Do the requests

Parameters
  • request_jobs (List[RequestJob]): List of RequestJob objects
  • num_workers (int, optional): Number of workers, by default PARALLEL_WORKERS
  • progress_bar (Optional[tqdm.tqdm], optional): Progress bar, by default None
Returns
  • list: List of RequestJob objects
Examples
>>> rh = RequestHandler(jobs)
>>> results = rh.do_requests()
>>> for job in results:
>>>     print(f'key: {job.key}, url: {job.url}, response: {job.response}')
def do_request(self, job: olivia_finder.myrequests.job.RequestJob):
175    def do_request(self, job: RequestJob):
176        '''
177        Do a single request
178        
179        Parameters
180        ----------
181        job : RequestJob
182            The RequestJob object
183
184        Returns
185        -------
186        RequestJob
187            The RequestJob object with the response
188
189        Examples
190        --------
191        >>> rh = RequestHandler()
192        >>> job = RequestJob("single_job", "https://www.google.com")
193        >>> result = rh.do_request(job)
194        >>> print(f'key: {result.key}, url: {result.url}, response: {result.response}')
195        '''
196
197        # Clear the RequestHandler object
198        self._clear()
199
200        # Setup job
201        self._setup_job(job)
202
203        # Start worker
204        worker = RequestWorker(0, self.jobs_queue)
205        self.logger.debug(f"Starting worker {worker.worker_id}")
206
207        self.logger.info(f"Starting request for {job.key}: {job.url}")
208        worker.start()
209
210        # Join worker to wait till it finished
211        worker.join()
212        self.logger.debug(f"Joining worker {worker.worker_id}")
213        self.logger.info(f"Request for {job.key}: {job.url} finished")
214
215        if worker.my_jobs[0].response is None:
216            self.logger.info(f"Request for {job.key}: {job.url} failed: response is None")
217
218        # Return the job
219        return worker.my_jobs[0]

Do a single request

Parameters
  • job (RequestJob): The RequestJob object
Returns
  • RequestJob: The RequestJob object with the response
Examples
>>> rh = RequestHandler()
>>> job = RequestJob("single_job", "https://www.google.com")
>>> result = rh.do_request(job)
>>> print(f'key: {result.key}, url: {result.url}, response: {result.response}')