-- -- --- --- --- --- --- --- ------- ------- ------- |
"""
-- -- --- --- --- --- --- --- ------- ------- ------- |
Builds a cache based on a source-control history.
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
TODO : Convert .gitignore to radon ignore patterns to make the build more efficient.
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
"""
-- -- --- --- --- --- --- --- ------- ------- ------- |
import pathlib
-- -- --- --- --- --- --- --- ------- ------- ------- |
import multiprocessing
-- -- --- --- --- --- --- --- ------- ------- ------- |
from progress.bar import Bar
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
from wily import logger
-- -- --- --- --- --- --- --- ------- ------- ------- |
from wily.state import State
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
from wily.archivers.git import InvalidGitRepositoryError
-- -- --- --- --- --- --- --- ------- ------- ------- |
from wily.archivers import FilesystemArchiver
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
from wily.operators import resolve_operator
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
def run_operator(operator, revision, config):
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
"""Run an operator for the multiprocessing pool. Not called directly."""
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
instance = operator.cls(config)
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
logger.debug(f"Running {operator.name} operator on {revision.key}")
-- 01 000 000 000 000 000 000 0000.00 0000.00 0000.00 |
return operator.name, instance.run(revision, config)
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- -- --- --- --- --- --- --- ------- ------- ------- |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
def build(config, archiver, operators):
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
"""
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
Build the history given a archiver and collection of operators.
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
:param config: The wily configuration
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
:type config: :namedtuple:`wily.config.WilyConfig`
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
:param archiver: The archiver to use
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
:type archiver: :namedtuple:`wily.archivers.Archiver`
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
:param operators: The list of operators to execute
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
:type operators: `list` of :namedtuple:`wily.operators.Operator`
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
"""
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
try:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.debug(f"Using {archiver.name} archiver module")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
archiver = archiver.cls(config)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
revisions = archiver.revisions(config.path, config.max_revisions)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
except InvalidGitRepositoryError:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# TODO: This logic shouldn't really be here (SoC)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.info(f"Defaulting back to the filesystem archiver, not a valid git repo")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
archiver = FilesystemArchiver(config)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
revisions = archiver.revisions(config.path, config.max_revisions)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
except Exception as e:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
if hasattr(e, "message"):
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.error(f"Failed to setup archiver: '{e.message}'")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
else:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.error(f"Failed to setup archiver: '{type(e)} - {e}'")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
exit(1)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
state = State(config, archiver=archiver)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# Check for existence of cache, else provision
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
state.ensure_exists()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
index = state.index[archiver.name]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# remove existing revisions from the list
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
revisions = [revision for revision in revisions if revision not in index]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.info(
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
f"Found {len(revisions)} revisions from '{archiver.name}' archiver in '{config.path}'."
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
_op_desc = ",".join([operator.name for operator in operators])
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.info(f"Running operators - {_op_desc}")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
bar = Bar("Processing", max=len(revisions) * len(operators))
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
state.operators = operators
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
try:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
with multiprocessing.Pool(processes=len(operators)) as pool:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for revision in revisions:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# Checkout target revision
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
archiver.checkout(revision, config.checkout_options)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
stats = {"operator_data": {}}
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# Run each operator as a seperate process
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
data = pool.starmap(
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
run_operator,
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
[(operator, revision, config) for operator in operators],
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# data is a list of tuples, where for each operator, it is a tuple of length 2,
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
operator_data_len = 2
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# second element in the tuple, i.e data[i][1]) has the collected data
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for i in range(0, len(operators)):
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
if i < len(data) and len(data[i]) >= operator_data_len and len(data[i][1]) == 0:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.warn(f"In revision {revision.key}, for operator {operators[i].name}: No data collected")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# Map the data back into a dictionary
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for operator_name, result in data:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# aggregate values to directories
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
roots = []
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# find all unique directories in the results
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for entry in result.keys():
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
parent = pathlib.Path(entry).parents[0]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
if parent not in roots:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
roots.append(parent)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for root in roots:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# find all matching entries recursively
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
aggregates = [
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
path
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for path in result.keys()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
if root in pathlib.Path(path).parents
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
result[str(root)] = {"total": {}}
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# aggregate values
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for metric in resolve_operator(operator_name).cls.metrics:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
func = metric.aggregate
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
values = [
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
result[aggregate]["total"][metric.name]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
for aggregate in aggregates
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
if aggregate in result
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
and metric.name in result[aggregate]["total"]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
]
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
if len(values) > 0:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
result[str(root)]["total"][metric.name] = func(values)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
stats["operator_data"][operator_name] = result
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
bar.next()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
ir = index.add(revision, operators=operators)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
ir.store(config, archiver, stats)
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
index.save()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
bar.finish()
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
except Exception as e:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
logger.error(f"Failed to build cache: '{e}'")
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
raise e
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
finally:
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
# Reset the archive after every run back to the head of the branch
-- 25 008 024 012 025 032 037 0185.00 0770.83 0004.17 |
archiver.finish()