wily\commands\build.py
-- -- --- --- --- --- --- --- ------- ------- ------- | """
-- -- --- --- --- --- --- --- ------- ------- ------- | Builds a cache based on a source-control history.
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | TODO : Convert .gitignore to radon ignore patterns to make the build more efficient.
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | """
-- -- --- --- --- --- --- --- ------- ------- ------- | import pathlib
-- -- --- --- --- --- --- --- ------- ------- ------- | import multiprocessing
-- -- --- --- --- --- --- --- ------- ------- ------- | from progress.bar import Bar
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily import logger
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.state import State
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.archivers.git import InvalidGitRepositoryError
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.archivers import FilesystemArchiver
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.operators import resolve_operator
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 | def run_operator(operator, revision, config):
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 | """Run an operator for the multiprocessing pool. Not called directly."""
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 | instance = operator.cls(config)
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 | logger.debug(f"Running {operator.name} operator on {revision.key}")
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 | return operator.name, instance.run(revision, config)
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | def build(config, archiver, operators):
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | """
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | Build the history given a archiver and collection of operators.
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | :param config: The wily configuration
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | :type config: :namedtuple:`wily.config.WilyConfig`
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | :param archiver: The archiver to use
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | :type archiver: :namedtuple:`wily.archivers.Archiver`
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | :param operators: The list of operators to execute
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | :type operators: `list` of :namedtuple:`wily.operators.Operator`
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | """
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | try:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.debug(f"Using {archiver.name} archiver module")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | archiver = archiver.cls(config)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | revisions = archiver.revisions(config.path, config.max_revisions)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | except InvalidGitRepositoryError:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # TODO: This logic shouldn't really be here (SoC)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.info(f"Defaulting back to the filesystem archiver, not a valid git repo")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | archiver = FilesystemArchiver(config)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | revisions = archiver.revisions(config.path, config.max_revisions)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | except Exception as e:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | if hasattr(e, "message"):
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.error(f"Failed to setup archiver: '{e.message}'")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | else:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.error(f"Failed to setup archiver: '{type(e)} - {e}'")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | exit(1)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | state = State(config, archiver=archiver)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # Check for existence of cache, else provision
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | state.ensure_exists()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | index = state.index[archiver.name]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # remove existing revisions from the list
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | revisions = [revision for revision in revisions if revision not in index]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.info(
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | f"Found {len(revisions)} revisions from '{archiver.name}' archiver in '{config.path}'."
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | )
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | _op_desc = ",".join([operator.name for operator in operators])
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.info(f"Running operators - {_op_desc}")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | bar = Bar("Processing", max=len(revisions) * len(operators))
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | state.operators = operators
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | try:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | with multiprocessing.Pool(processes=len(operators)) as pool:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for revision in revisions:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # Checkout target revision
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | archiver.checkout(revision, config.checkout_options)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | stats = {"operator_data": {}}
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # Run each operator as a seperate process
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | data = pool.starmap(
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | run_operator,
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | [(operator, revision, config) for operator in operators],
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | )
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # data is a list of tuples, where for each operator, it is a tuple of length 2,
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | operator_data_len = 2
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # second element in the tuple, i.e data[i][1]) has the collected data
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for i in range(0, len(operators)):
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | if i < len(data) and len(data[i]) >= operator_data_len and len(data[i][1]) == 0:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.warn(f"In revision {revision.key}, for operator {operators[i].name}: No data collected")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # Map the data back into a dictionary
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for operator_name, result in data:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # aggregate values to directories
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | roots = []
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # find all unique directories in the results
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for entry in result.keys():
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | parent = pathlib.Path(entry).parents[0]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | if parent not in roots:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | roots.append(parent)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for root in roots:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # find all matching entries recursively
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | aggregates = [
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | path
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for path in result.keys()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | if root in pathlib.Path(path).parents
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | ]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | result[str(root)] = {"total": {}}
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # aggregate values
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for metric in resolve_operator(operator_name).cls.metrics:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | func = metric.aggregate
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | values = [
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | result[aggregate]["total"][metric.name]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | for aggregate in aggregates
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | if aggregate in result
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | and metric.name in result[aggregate]["total"]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | ]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | if len(values) > 0:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | result[str(root)]["total"][metric.name] = func(values)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | stats["operator_data"][operator_name] = result
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | bar.next()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | ir = index.add(revision, operators=operators)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | ir.store(config, archiver, stats)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | index.save()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | bar.finish()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | except Exception as e:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | logger.error(f"Failed to build cache: '{e}'")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | raise e
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | finally:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | # Reset the archive after every run back to the head of the branch
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 | archiver.finish()