src\wily\commands\build.py
-- -- --- --- --- --- --- --- ------- ------- ------- | """
-- -- --- --- --- --- --- --- ------- ------- ------- | Builds a cache based on a source-control history.
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | TODO : Convert .gitignore to radon ignore patterns to make the build more efficient.
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | """
-- -- --- --- --- --- --- --- ------- ------- ------- | import multiprocessing
-- -- --- --- --- --- --- --- ------- ------- ------- | import os
-- -- --- --- --- --- --- --- ------- ------- ------- | import pathlib
-- -- --- --- --- --- --- --- ------- ------- ------- | from sys import exit
-- -- --- --- --- --- --- --- ------- ------- ------- | from typing import Any, Dict, List, Tuple
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | from progress.bar import Bar
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily import logger
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.archivers import Archiver, FilesystemArchiver, Revision
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.archivers.git import InvalidGitRepositoryError
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.config.types import WilyConfig
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.operators import Operator, resolve_operator
-- -- --- --- --- --- --- --- ------- ------- ------- | from wily.state import State
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | def run_operator(
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | operator: Operator, revision: Revision, config: WilyConfig, targets: List[str]
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | ) -> Tuple[str, Dict[str, Any]]:
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | """
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | Run an operator for the multiprocessing pool.
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | :param operator: The operator to use
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | :param revision: The revision index
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | :param config: The runtime configuration
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | :param targets: Files/paths to scan
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | """
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | instance = operator.operator_cls(config, targets)
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | logger.debug(f"Running {operator.name} operator on {revision}")
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | data = instance.run(revision, config)
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | # Normalize paths for non-seed passes
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | for key in list(data.keys()):
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | if os.path.isabs(key):
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | rel = os.path.relpath(key, config.path)
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | data[rel] = data[key]
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | del data[key]
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
-- 03 000 000 000 000 000 000 0000.00 0000.00 0000.00 | return operator.name, data
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | def build(config: WilyConfig, archiver: Archiver, operators: List[Operator]) -> None:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | """
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | Build the history given an archiver and collection of operators.
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | :param config: The wily configuration
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | :param archiver: The archiver to use
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | :param operators: The list of operators to execute
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | """
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | try:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.debug(f"Using {archiver.name} archiver module")
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | archiver_instance = archiver.archiver_cls(config)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | revisions = archiver_instance.revisions(config.path, config.max_revisions)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | except InvalidGitRepositoryError:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # TODO: This logic shouldn't really be here (SoC)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.info("Defaulting back to the filesystem archiver, not a valid git repo")
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | archiver_instance = FilesystemArchiver(config)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | revisions = archiver_instance.revisions(config.path, config.max_revisions)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | except Exception as e:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | message = getattr(e, "message", f"{type(e)} - {e}")
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.error(f"Failed to setup archiver: '{message}'")
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | exit(1)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | state = State(config, archiver=archiver_instance)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Check for existence of cache, else provision
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | state.ensure_exists()
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | index = state.index[archiver_instance.name]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # remove existing revisions from the list
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | revisions = [revision for revision in revisions if revision not in index][::-1]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.info(
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | f"Found {len(revisions)} revisions from '{archiver_instance.name}' archiver in '{config.path}'."
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | )
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | _op_desc = ",".join([operator.name for operator in operators])
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.info(f"Running operators - {_op_desc}")
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | bar = Bar("Processing", max=len(revisions) * len(operators))
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | state.operators = operators
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Index all files the first time, only scan changes afterward
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | seed = True
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | try:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | with multiprocessing.Pool(processes=len(operators)) as pool:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | prev_stats = {}
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for revision in revisions:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Checkout target revision
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | archiver_instance.checkout(revision, config.checkout_options)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | stats = {"operator_data": {}}
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # TODO : Check that changed files are children of the targets
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | targets = [
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | str(pathlib.Path(config.path) / pathlib.Path(file))
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for file in revision.added_files + revision.modified_files
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # if any([True for target in config.targets if
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # target in pathlib.Path(pathlib.Path(config.path) / pathlib.Path(file)).parents])
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Run each operator as a separate process
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | data = pool.starmap(
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | run_operator,
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | [(operator, revision, config, targets) for operator in operators],
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | )
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # data is a list of tuples, where for each operator, it is a tuple of length 2,
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | operator_data_len = 2
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # second element in the tuple, i.e data[i][1]) has the collected data
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for i in range(0, len(operators)):
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if (
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | i < len(data)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | and len(data[i]) >= operator_data_len
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | and len(data[i][1]) == 0
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ):
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.warning(
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | f"In revision {revision.key}, for operator {operators[i].name}: No data collected"
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | )
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Map the data back into a dictionary
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for operator_name, result in data:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # find all unique directories in the results
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | indices = set(result.keys())
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # For a seed run, there is no previous change set, so use current
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if seed:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | prev_indices = indices
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Copy the ir from any unchanged files from the prev revision
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if not seed:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | missing_indices = {
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | str(pathlib.Path(fn)) for fn in revision.tracked_files
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | } - indices
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # TODO: Check existence of file path.
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for missing in missing_indices:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Don't copy aggregate keys as their values may have changed
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if missing in revision.tracked_dirs:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | continue
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # previous index may not have that operator
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if operator_name not in prev_stats["operator_data"]:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | continue
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # previous index may not have file either
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if (
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | missing
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | not in prev_stats["operator_data"][operator_name]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ):
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | continue
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | result[missing] = prev_stats["operator_data"][
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | operator_name
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ][missing]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for deleted in revision.deleted_files:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | result.pop(deleted, None)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Aggregate metrics across all root paths using the aggregate function in the metric
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Note assumption is that nested dirs are listed after parent..
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for root in sorted(
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | {str(pathlib.Path(dn)) for dn in revision.tracked_dirs}
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ):
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # find all matching entries recursively
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | aggregates = [
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | path for path in result.keys() if path.startswith(root)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | result[str(root)] = {"total": {}}
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # aggregate values
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for metric in resolve_operator(
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | operator_name
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ).operator_cls.metrics:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | func = metric.aggregate
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | values = [
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | result[aggregate]["total"][metric.name]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | for aggregate in aggregates
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if aggregate in result
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | and metric.name in result[aggregate]["total"]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ]
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | if len(values) > 0:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | result[str(root)]["total"][metric.name] = func(values)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | prev_indices = set(result.keys())
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | stats["operator_data"][operator_name] = result
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | bar.next()
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 |
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | prev_stats = stats
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | seed = False
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ir = index.add(revision, operators=operators)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | ir.store(config, archiver_instance.name, stats)
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | index.save()
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | bar.finish()
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | except Exception as e:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | logger.error(f"Failed to build cache: {type(e)}: '{e}'")
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | raise e
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | finally:
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | # Reset the archive after every run back to the head of the branch
-- 32 013 033 018 035 046 053 0292.75 2018.19 0006.89 | archiver_instance.finish()