Source code for despasito.utils.parallelization

"""

Parallelization class to handle processing threads and logging.

"""

import numpy as np
import multiprocessing
import logging
import logging.handlers
import os
import glob

logger = logging.getLogger(__name__)


[docs] class MultiprocessingJob: """ This object initiates the pool for multiprocessing jobs. Parameters ---------- ncores : int, Optional, default=-1 Number of processes used. If the default value of -1, the system cpu count is used. Attributes ---------- pool : function This pool is used to parallelize jobs. """ def __init__(self, ncores=-1): multiprocessing.freeze_support() self.flag_use_mp = True if ncores == -1: ncores = multiprocessing.cpu_count() # includes logical cores! logger.info("Detected {} cores".format(ncores)) elif ncores > 1: logger.info("Number of cores set to {}".format(ncores)) elif ncores == 1: self.flag_use_mp = False logger.info("Number of cores set to 1, bypassing mp and using serial methods") else: raise ValueError("Number of cores cannot be zero or negative.") self.ncores = ncores if self.flag_use_mp: # Remove old mp logs self._extract_root_logging() # Initiate multiprocessing ctx = multiprocessing.get_context("spawn") self._pool = ctx.Pool( ncores, initializer=self._initialize_mp_handler, initargs=(self._level, self._logformat), ) self.logfiles = [] for worker in self._pool._pool: filename = "mp-handler-{0}.log".format(worker.pid) self.logfiles.append(filename) logger.info("MP log files: {}".format(", ".join(self.logfiles))) def _extract_root_logging(self): """Swap root handlers defined in despasito.__main__ with process specific log handlers""" for handler in logging.root.handlers: if "baseFilename" in handler.__dict__: self._logformat = handler.formatter._fmt self._level = handler.level if not hasattr(self, "_logformat"): self._logformat = None self._level = None @staticmethod def _initialize_mp_handler(level, logformat): """Wraps the handlers in the given Logger with an MultiProcessingHandler. Parameters ---------- level : int The verbosity level of logging information can be set to any supported representation of the `logging level <LOGGING>`_ . logformat : str Formating of logging information can be set to any supported representation of the `formatting class <FORMAT>`_. .. _LOGGING: https://docs.python.org/3/library/logging.html#logging-levels/ .. _FORMAT: https://docs.python.org/3/library/logging.html#logging.Formatter/ """ logger = logging.getLogger("despasito") pid = os.getpid() filename = "mp-handler-{0}.log".format(pid) handler = logging.handlers.RotatingFileHandler(filename) if level is not None: logger.setLevel(level) handler.setLevel(level) if logformat is not None: handler.setFormatter(logging.Formatter(logformat)) logger.addHandler(handler)
[docs] def pool_job(self, func, inputs): """ This function will setup and dispatch thermodynamic or parameter fitting jobs. Parameters ---------- func : function Function used in job inputs : list[tuple] Each entry of this list contains the input arguments for each job Returns ------- output : tuple This structure contains the outputs of the jobs given """ if self.flag_use_mp: output = zip(*self._pool.map(func, inputs)) self._consolidate_mp_logs() else: logger.info("Performing task serially") output = self.serial_job(func, inputs) return output
[docs] @staticmethod def serial_job(func, inputs): """ This function will serially perform thermodynamic jobs. Parameters ---------- func : function Function used in job inputs : tuple The input arguments for this job Returns ------- output : tuple This structure contains the outputs of the jobs given """ output = [] for i, finput in enumerate(inputs): foutput = func(finput) output.append(foutput) output = np.array(output, dtype=object) return np.transpose(output)
def _consolidate_mp_logs(self): """Consolidate multiprocessing logs into main log""" for i, fn in enumerate(self.logfiles): with open(fn) as f: logger.info("Log from thread {0}:\n{1}".format(i, f.read())) open(fn, "w").write("") def _remove_mp_logs(self): """Ensure all previous mp logs are removed""" for i, fn in enumerate(self.logfiles): os.remove(fn)
[docs] def end_pool(self): """Close multiprocessing pool""" if self.flag_use_mp: self._pool.close() self._pool.join() self._remove_mp_logs()
[docs] def initialize_mp_handler(level, logformat): """Wraps the handlers in the given Logger with an MultiProcessingHandler. Parameters ---------- level : int The verbosity level of logging information can be set to any supported representation of the `logging level <LOGGING>`_ . logformat : str Formating of logging information can be set to any supported representation of the `formatting class <FORMAT>`_ . """ logger = logging.getLogger("despasito") pid = os.getpid() filename = "mp-handler-{0}.log".format(pid) handler = logging.handlers.RotatingFileHandler(filename) handler.setFormatter(logging.Formatter(logformat)) handler.setLevel(level) logger.addHandler(handler)
[docs] def batch_jobs(func, inputs, ncores=1, logger=None): """ This function will setup and dispatch thermodynamic jobs. Parameters ---------- func : function Function used in job inputs : list Each entry of this list contains the input arguments for each job ncores : int, Optional, default=1 Number of processes used. logger : class, Optional, default=None The logger object used. Returns ------- output : tuple This structure contains the outputs of the jobs given """ if logger is None: logger = logging.getLogger() root_handlers = logging.root.handlers for handler in root_handlers: if "baseFilename" in handler.__dict__: logformat = handler.formatter._fmt level = handler.level logging.root.handlers = [] pool = multiprocessing.Pool(ncores, initializer=initialize_mp_handler, initargs=(level, logformat)) output = zip(*pool.map(func, inputs)) logging.root.handlers = root_handlers for i, fn in enumerate(glob.glob("./mp-handler-*.log")): with open(fn) as f: logger.info("Log from thread {0}:\n{1}".format(i, f.read())) os.remove(fn) return output