olivia_finder.myrequests.request_handler
1import queue 2import tqdm 3import gc 4from typing import List, Optional 5from .job import RequestJob 6from .request_worker import RequestWorker 7from ..utilities.logger import MyLogger 8from ..utilities.config import Configuration 9 10class RequestHandler: 11 ''' 12 Main class of the myrequests package. It handles the requests to the server using concurrent workers. 13 14 ''' 15 16 PARALLEL_WORKERS = 8 17 18 def __init__(self): 19 ''' 20 Constructor for RequestHandler class 21 ''' 22 23 # Get logger name from config file 24 # self.logger = MyLogger.get_logger(Configuration().get_key('logger_myrequests', 'name')) 25 self.logger = MyLogger.get_logger('logger_myrequests') 26 self.logger.debug("Creating RequestHandler object") 27 28 # Initialize 29 self.jobs_queue = queue.Queue() 30 self.num_workers = 1 31 self.workers: List[RequestWorker] = [] 32 33 def _clear(self): 34 ''' 35 Reset the RequestHandler object 36 ''' 37 38 # Delete the objects 39 del self.jobs_queue 40 del self.workers 41 gc.collect() 42 43 # Initialize 44 self.jobs_queue = queue.Queue() 45 self.num_workers = 1 46 self.workers: List[RequestWorker] = [] 47 48 def _setup_jobs( 49 self, 50 request_jobs: List[RequestJob], 51 num_workers: int, 52 progress_bar: Optional[tqdm.tqdm] = None) -> None: 53 ''' 54 Setup the jobs 55 56 Parameters 57 ---------- 58 request_jobs : List[RequestJob] 59 List of RequestJob objects 60 num_workers : int 61 Number of workers 62 progress_bar : Optional[tqdm.tqdm], optional 63 Progress bar, by default None 64 ''' 65 66 # Enqueue jobs 67 for job in request_jobs: 68 # job.progress_bar = progress_bar 69 self.jobs_queue.put(job) 70 71 self.logger.debug(f"Jobs queue size: {self.jobs_queue.qsize()}") 72 73 # Workers keep working till they receive an exit string 74 # So we need to add the number of workers times the exit string to the queue 75 self.num_workers = num_workers 76 for _ in range(self.num_workers): 77 self.jobs_queue.put(RequestJob.end_job_signal()) 78 79 # Create workers and add to the queue 80 self.logger.debug("Creating workers") 81 82 for i in range(self.num_workers): 83 worker = RequestWorker(i, self.jobs_queue, progress_bar) 84 self.workers.append(worker) 85 86 self.logger.debug(f"Created workers: {len(self.workers)}") 87 88 def _setup_job(self, request_job: RequestJob): 89 ''' 90 Setup a single job 91 92 Parameters 93 ---------- 94 request_job : RequestJob 95 The RequestJob object 96 ''' 97 98 # Enqueue jobs 99 self.jobs_queue.put(request_job) 100 101 # Workers keep working till they receive an exit string 102 # So we need to add the number of workers times the exit string to the queue 103 self.jobs_queue.put(RequestJob.end_job_signal()) 104 105 # Create workers and add to the queue 106 self.workers.append( 107 RequestWorker(0, self.jobs_queue) 108 ) 109 self.logger.debug("Job created") 110 111 def do_requests( 112 self, 113 request_jobs: List[RequestJob], 114 num_workers: int = PARALLEL_WORKERS, 115 progress_bar: Optional[tqdm.tqdm] = None) -> List[RequestJob]: 116 ''' 117 Do the requests 118 119 Parameters 120 ---------- 121 request_jobs : List[RequestJob] 122 List of RequestJob objects 123 num_workers : int, optional 124 Number of workers, by default PARALLEL_WORKERS 125 progress_bar : Optional[tqdm.tqdm], optional 126 Progress bar, by default None 127 128 Returns 129 ------- 130 list 131 List of RequestJob objects 132 133 Examples 134 -------- 135 >>> rh = RequestHandler(jobs) 136 >>> results = rh.do_requests() 137 >>> for job in results: 138 >>> print(f'key: {job.key}, url: {job.url}, response: {job.response}') 139 ''' 140 141 # Clear the RequestHandler object 142 self._clear() 143 144 self.logger.info("Starting requests") 145 self.logger.debug(f"Number of jobs: {len(request_jobs)}") 146 147 # Setup jobs 148 self._setup_jobs(request_jobs, num_workers, progress_bar) 149 150 # Start workers 151 for worker in self.workers: 152 self.logger.debug(f"Starting worker {worker.worker_id}") 153 worker.start() 154 155 # Join workers to wait till they finished 156 for worker in self.workers: 157 158 worker.join() 159 self.logger.debug(f"Joining worker {worker.worker_id}") 160 161 # Combine results from all workers 162 workers_finalized_jobs = [] 163 for worker in self.workers: 164 self.logger.debug(f"Worker {worker.worker_id} finished") 165 workers_finalized_jobs.extend(worker.my_jobs.copy()) 166 167 self.logger.info("All requests finished") 168 169 # Clear memory 170 self._clear() 171 172 return workers_finalized_jobs 173 174 def do_request(self, job: RequestJob): 175 ''' 176 Do a single request 177 178 Parameters 179 ---------- 180 job : RequestJob 181 The RequestJob object 182 183 Returns 184 ------- 185 RequestJob 186 The RequestJob object with the response 187 188 Examples 189 -------- 190 >>> rh = RequestHandler() 191 >>> job = RequestJob("single_job", "https://www.google.com") 192 >>> result = rh.do_request(job) 193 >>> print(f'key: {result.key}, url: {result.url}, response: {result.response}') 194 ''' 195 196 # Clear the RequestHandler object 197 self._clear() 198 199 # Setup job 200 self._setup_job(job) 201 202 # Start worker 203 worker = RequestWorker(0, self.jobs_queue) 204 self.logger.debug(f"Starting worker {worker.worker_id}") 205 206 self.logger.info(f"Starting request for {job.key}: {job.url}") 207 worker.start() 208 209 # Join worker to wait till it finished 210 worker.join() 211 self.logger.debug(f"Joining worker {worker.worker_id}") 212 self.logger.info(f"Request for {job.key}: {job.url} finished") 213 214 if worker.my_jobs[0].response is None: 215 self.logger.info(f"Request for {job.key}: {job.url} failed: response is None") 216 217 # Return the job 218 return worker.my_jobs[0] 219
class
RequestHandler:
11class RequestHandler: 12 ''' 13 Main class of the myrequests package. It handles the requests to the server using concurrent workers. 14 15 ''' 16 17 PARALLEL_WORKERS = 8 18 19 def __init__(self): 20 ''' 21 Constructor for RequestHandler class 22 ''' 23 24 # Get logger name from config file 25 # self.logger = MyLogger.get_logger(Configuration().get_key('logger_myrequests', 'name')) 26 self.logger = MyLogger.get_logger('logger_myrequests') 27 self.logger.debug("Creating RequestHandler object") 28 29 # Initialize 30 self.jobs_queue = queue.Queue() 31 self.num_workers = 1 32 self.workers: List[RequestWorker] = [] 33 34 def _clear(self): 35 ''' 36 Reset the RequestHandler object 37 ''' 38 39 # Delete the objects 40 del self.jobs_queue 41 del self.workers 42 gc.collect() 43 44 # Initialize 45 self.jobs_queue = queue.Queue() 46 self.num_workers = 1 47 self.workers: List[RequestWorker] = [] 48 49 def _setup_jobs( 50 self, 51 request_jobs: List[RequestJob], 52 num_workers: int, 53 progress_bar: Optional[tqdm.tqdm] = None) -> None: 54 ''' 55 Setup the jobs 56 57 Parameters 58 ---------- 59 request_jobs : List[RequestJob] 60 List of RequestJob objects 61 num_workers : int 62 Number of workers 63 progress_bar : Optional[tqdm.tqdm], optional 64 Progress bar, by default None 65 ''' 66 67 # Enqueue jobs 68 for job in request_jobs: 69 # job.progress_bar = progress_bar 70 self.jobs_queue.put(job) 71 72 self.logger.debug(f"Jobs queue size: {self.jobs_queue.qsize()}") 73 74 # Workers keep working till they receive an exit string 75 # So we need to add the number of workers times the exit string to the queue 76 self.num_workers = num_workers 77 for _ in range(self.num_workers): 78 self.jobs_queue.put(RequestJob.end_job_signal()) 79 80 # Create workers and add to the queue 81 self.logger.debug("Creating workers") 82 83 for i in range(self.num_workers): 84 worker = RequestWorker(i, self.jobs_queue, progress_bar) 85 self.workers.append(worker) 86 87 self.logger.debug(f"Created workers: {len(self.workers)}") 88 89 def _setup_job(self, request_job: RequestJob): 90 ''' 91 Setup a single job 92 93 Parameters 94 ---------- 95 request_job : RequestJob 96 The RequestJob object 97 ''' 98 99 # Enqueue jobs 100 self.jobs_queue.put(request_job) 101 102 # Workers keep working till they receive an exit string 103 # So we need to add the number of workers times the exit string to the queue 104 self.jobs_queue.put(RequestJob.end_job_signal()) 105 106 # Create workers and add to the queue 107 self.workers.append( 108 RequestWorker(0, self.jobs_queue) 109 ) 110 self.logger.debug("Job created") 111 112 def do_requests( 113 self, 114 request_jobs: List[RequestJob], 115 num_workers: int = PARALLEL_WORKERS, 116 progress_bar: Optional[tqdm.tqdm] = None) -> List[RequestJob]: 117 ''' 118 Do the requests 119 120 Parameters 121 ---------- 122 request_jobs : List[RequestJob] 123 List of RequestJob objects 124 num_workers : int, optional 125 Number of workers, by default PARALLEL_WORKERS 126 progress_bar : Optional[tqdm.tqdm], optional 127 Progress bar, by default None 128 129 Returns 130 ------- 131 list 132 List of RequestJob objects 133 134 Examples 135 -------- 136 >>> rh = RequestHandler(jobs) 137 >>> results = rh.do_requests() 138 >>> for job in results: 139 >>> print(f'key: {job.key}, url: {job.url}, response: {job.response}') 140 ''' 141 142 # Clear the RequestHandler object 143 self._clear() 144 145 self.logger.info("Starting requests") 146 self.logger.debug(f"Number of jobs: {len(request_jobs)}") 147 148 # Setup jobs 149 self._setup_jobs(request_jobs, num_workers, progress_bar) 150 151 # Start workers 152 for worker in self.workers: 153 self.logger.debug(f"Starting worker {worker.worker_id}") 154 worker.start() 155 156 # Join workers to wait till they finished 157 for worker in self.workers: 158 159 worker.join() 160 self.logger.debug(f"Joining worker {worker.worker_id}") 161 162 # Combine results from all workers 163 workers_finalized_jobs = [] 164 for worker in self.workers: 165 self.logger.debug(f"Worker {worker.worker_id} finished") 166 workers_finalized_jobs.extend(worker.my_jobs.copy()) 167 168 self.logger.info("All requests finished") 169 170 # Clear memory 171 self._clear() 172 173 return workers_finalized_jobs 174 175 def do_request(self, job: RequestJob): 176 ''' 177 Do a single request 178 179 Parameters 180 ---------- 181 job : RequestJob 182 The RequestJob object 183 184 Returns 185 ------- 186 RequestJob 187 The RequestJob object with the response 188 189 Examples 190 -------- 191 >>> rh = RequestHandler() 192 >>> job = RequestJob("single_job", "https://www.google.com") 193 >>> result = rh.do_request(job) 194 >>> print(f'key: {result.key}, url: {result.url}, response: {result.response}') 195 ''' 196 197 # Clear the RequestHandler object 198 self._clear() 199 200 # Setup job 201 self._setup_job(job) 202 203 # Start worker 204 worker = RequestWorker(0, self.jobs_queue) 205 self.logger.debug(f"Starting worker {worker.worker_id}") 206 207 self.logger.info(f"Starting request for {job.key}: {job.url}") 208 worker.start() 209 210 # Join worker to wait till it finished 211 worker.join() 212 self.logger.debug(f"Joining worker {worker.worker_id}") 213 self.logger.info(f"Request for {job.key}: {job.url} finished") 214 215 if worker.my_jobs[0].response is None: 216 self.logger.info(f"Request for {job.key}: {job.url} failed: response is None") 217 218 # Return the job 219 return worker.my_jobs[0]
Main class of the myrequests package. It handles the requests to the server using concurrent workers.
RequestHandler()
19 def __init__(self): 20 ''' 21 Constructor for RequestHandler class 22 ''' 23 24 # Get logger name from config file 25 # self.logger = MyLogger.get_logger(Configuration().get_key('logger_myrequests', 'name')) 26 self.logger = MyLogger.get_logger('logger_myrequests') 27 self.logger.debug("Creating RequestHandler object") 28 29 # Initialize 30 self.jobs_queue = queue.Queue() 31 self.num_workers = 1 32 self.workers: List[RequestWorker] = []
Constructor for RequestHandler class
def
do_requests( self, request_jobs: List[olivia_finder.myrequests.job.RequestJob], num_workers: int = 8, progress_bar: Optional[tqdm.std.tqdm] = None) -> List[olivia_finder.myrequests.job.RequestJob]:
112 def do_requests( 113 self, 114 request_jobs: List[RequestJob], 115 num_workers: int = PARALLEL_WORKERS, 116 progress_bar: Optional[tqdm.tqdm] = None) -> List[RequestJob]: 117 ''' 118 Do the requests 119 120 Parameters 121 ---------- 122 request_jobs : List[RequestJob] 123 List of RequestJob objects 124 num_workers : int, optional 125 Number of workers, by default PARALLEL_WORKERS 126 progress_bar : Optional[tqdm.tqdm], optional 127 Progress bar, by default None 128 129 Returns 130 ------- 131 list 132 List of RequestJob objects 133 134 Examples 135 -------- 136 >>> rh = RequestHandler(jobs) 137 >>> results = rh.do_requests() 138 >>> for job in results: 139 >>> print(f'key: {job.key}, url: {job.url}, response: {job.response}') 140 ''' 141 142 # Clear the RequestHandler object 143 self._clear() 144 145 self.logger.info("Starting requests") 146 self.logger.debug(f"Number of jobs: {len(request_jobs)}") 147 148 # Setup jobs 149 self._setup_jobs(request_jobs, num_workers, progress_bar) 150 151 # Start workers 152 for worker in self.workers: 153 self.logger.debug(f"Starting worker {worker.worker_id}") 154 worker.start() 155 156 # Join workers to wait till they finished 157 for worker in self.workers: 158 159 worker.join() 160 self.logger.debug(f"Joining worker {worker.worker_id}") 161 162 # Combine results from all workers 163 workers_finalized_jobs = [] 164 for worker in self.workers: 165 self.logger.debug(f"Worker {worker.worker_id} finished") 166 workers_finalized_jobs.extend(worker.my_jobs.copy()) 167 168 self.logger.info("All requests finished") 169 170 # Clear memory 171 self._clear() 172 173 return workers_finalized_jobs
Do the requests
Parameters
- request_jobs (List[RequestJob]): List of RequestJob objects
- num_workers (int, optional): Number of workers, by default PARALLEL_WORKERS
- progress_bar (Optional[tqdm.tqdm], optional): Progress bar, by default None
Returns
- list: List of RequestJob objects
Examples
>>> rh = RequestHandler(jobs)
>>> results = rh.do_requests()
>>> for job in results:
>>> print(f'key: {job.key}, url: {job.url}, response: {job.response}')
175 def do_request(self, job: RequestJob): 176 ''' 177 Do a single request 178 179 Parameters 180 ---------- 181 job : RequestJob 182 The RequestJob object 183 184 Returns 185 ------- 186 RequestJob 187 The RequestJob object with the response 188 189 Examples 190 -------- 191 >>> rh = RequestHandler() 192 >>> job = RequestJob("single_job", "https://www.google.com") 193 >>> result = rh.do_request(job) 194 >>> print(f'key: {result.key}, url: {result.url}, response: {result.response}') 195 ''' 196 197 # Clear the RequestHandler object 198 self._clear() 199 200 # Setup job 201 self._setup_job(job) 202 203 # Start worker 204 worker = RequestWorker(0, self.jobs_queue) 205 self.logger.debug(f"Starting worker {worker.worker_id}") 206 207 self.logger.info(f"Starting request for {job.key}: {job.url}") 208 worker.start() 209 210 # Join worker to wait till it finished 211 worker.join() 212 self.logger.debug(f"Joining worker {worker.worker_id}") 213 self.logger.info(f"Request for {job.key}: {job.url} finished") 214 215 if worker.my_jobs[0].response is None: 216 self.logger.info(f"Request for {job.key}: {job.url} failed: response is None") 217 218 # Return the job 219 return worker.my_jobs[0]
Do a single request
Parameters
- job (RequestJob): The RequestJob object
Returns
- RequestJob: The RequestJob object with the response
Examples
>>> rh = RequestHandler()
>>> job = RequestJob("single_job", "https://www.google.com")
>>> result = rh.do_request(job)
>>> print(f'key: {result.key}, url: {result.url}, response: {result.response}')