+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import shlex
+import sys
+import faulthandler
+faulthandler.enable(file=sys.__stderr__) # will catch segfaults and write to stderr
+
+from lib.venv_checker import check_venv
+check_venv() # this check must even run before __main__ as imports might not get resolved
+
+import subprocess
+import json
+import os
+import time
+from html import escape
+import importlib
+import re
+from pathlib import Path
+import random
+import shutil
+import yaml
+from collections import OrderedDict
+from datetime import datetime
+import platform
+
+GMT_ROOT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../')
+
+from lib import utils
+from lib import process_helpers
+from lib import hardware_info
+from lib import hardware_info_root
+from lib import error_helpers
+from lib.repo_info import get_repo_info
+from lib.debug_helper import DebugHelper
+from lib.terminal_colors import TerminalColors
+from lib.schema_checker import SchemaChecker
+from lib.db import DB
+from lib.global_config import GlobalConfig
+from lib.notes import Notes
+from lib import system_checks
+from lib.machine import Machine
+from lib import metric_importer
+
+def arrows(text):
+ return f"\n\n>>>> {text} <<<<\n\n"
+
+class ScenarioRunner:
+ def __init__(self,
+ *, uri, uri_type, name=None, filename='usage_scenario.yml', branch=None,
+ debug_mode=False, allow_unsafe=False, skip_system_checks=False,
+ skip_unsafe=False, verbose_provider_boot=False, full_docker_prune=False,
+ dev_no_sleeps=False, dev_cache_build=False, dev_no_metrics=False,
+ dev_flow_timetravel=False, dev_no_optimizations=False, docker_prune=False, job_id=None,
+ user_id=1, measurement_flow_process_duration=None, measurement_total_duration=None, disabled_metric_providers=None, allowed_run_args=None, dev_no_phase_stats=False,
+ skip_volume_inspect=False, commit_hash_folder=''):
+
+ if skip_unsafe is True and allow_unsafe is True:
+ raise RuntimeError('Cannot specify both --skip-unsafe and --allow-unsafe')
+
+ # variables that should not change if you call run multiple times
+ if name:
+ self._name = name
+ else:
+ self._name = f"Run {datetime.now()}"
+ self._debugger = DebugHelper(debug_mode)
+ self._allow_unsafe = allow_unsafe
+ self._skip_unsafe = skip_unsafe
+ self._skip_system_checks = skip_system_checks
+ self._skip_volume_inspect = skip_volume_inspect
+ self._verbose_provider_boot = verbose_provider_boot
+ self._full_docker_prune = full_docker_prune
+ self._docker_prune = docker_prune
+ self._dev_no_sleeps = dev_no_sleeps
+ self._dev_cache_build = dev_cache_build
+ self._dev_no_metrics = dev_no_metrics
+ self._dev_flow_timetravel = dev_flow_timetravel
+ self._dev_no_optimizations = dev_no_optimizations
+ self._dev_no_phase_stats = dev_no_phase_stats
+ self._uri = uri
+ self._uri_type = uri_type
+ self._original_filename = filename
+ self._branch = branch
+ self._tmp_folder = Path('/tmp/green-metrics-tool').resolve() # since linux has /tmp and macos /private/tmp
+ self._usage_scenario = {}
+ self._architecture = utils.get_architecture()
+
+ self._sci = {'R_d': None, 'R': 0}
+ self._sci |= GlobalConfig().config.get('sci', None) # merge in data from machine config like I, TE etc.
+
+ self._job_id = job_id
+ self._arguments = locals()
+ self._repo_folder = f"{self._tmp_folder}/repo" # default if not changed in checkout_repository
+ self._run_id = None
+ self._commit_hash = None
+ self._commit_timestamp = None
+ self._commit_hash_folder = commit_hash_folder
+ self._user_id = user_id
+ self._measurement_flow_process_duration = measurement_flow_process_duration
+ self._measurement_total_duration = measurement_total_duration
+ self._disabled_metric_providers = [] if disabled_metric_providers is None else disabled_metric_providers
+ self._allowed_run_args = [] if allowed_run_args is None else allowed_run_args # They are specific to the orchestrator. However currently we only have one. As soon as we support more orchestrators we will sub-class Runner with dedicated child classes (DockerRunner, PodmanRunner etc.)
+ self._last_measurement_duration = 0
+
+ del self._arguments['self'] # self is not needed and also cannot be serialzed. We remove it
+
+
+ # transient variables that are created by the runner itself
+ # these are accessed and processed on cleanup and then reset
+ # They are __ as they should not be changed because this could break the state of the runner
+ self.__stdout_logs = OrderedDict()
+ self.__containers = {}
+ self.__networks = []
+ self.__ps_to_kill = []
+ self.__ps_to_read = []
+ self.__metric_providers = []
+ self.__notes_helper = Notes()
+ self.__phases = OrderedDict()
+ self.__start_measurement_seconds = None
+ self.__start_measurement = None
+ self.__end_measurement = None
+ self.__services_to_pause_phase = {}
+ self.__join_default_network = False
+ self.__docker_params = []
+ self.__working_folder = self._repo_folder
+ self.__working_folder_rel = ''
+ self.__image_sizes = {}
+ self.__volume_sizes = {}
+
+ # we currently do not use this variable
+ # self.__filename = self._original_filename # this can be changed later if working directory changes
+
+ def custom_sleep(self, sleep_time):
+ if not self._dev_no_sleeps:
+ print(TerminalColors.HEADER, '\nSleeping for : ', sleep_time, TerminalColors.ENDC)
+ time.sleep(sleep_time)
+
+ def get_optimizations_ignore(self):
+ return self._usage_scenario.get('optimizations_ignore', [])
+
+ # This function takes a path and a file and joins them while making sure that no one is trying to escape the
+ # path with `..`, symbolic links or similar.
+ # We always return the same error message including the path and file parameter, never `filename` as
+ # otherwise we might disclose if certain files exist or not.
+ def join_paths(self, path, path2, force_path_as_root=False):
+ filename = os.path.realpath(os.path.join(path, path2))
+
+ # If the original path is a symlink we need to resolve it.
+ path = os.path.realpath(path)
+
+ # This is a special case in which the file is '.'
+ if filename == path.rstrip('/'):
+ return filename
+
+ if not filename.startswith(self._repo_folder):
+ raise ValueError(f"{path2} must not be in folder above root repo folder {self._repo_folder}")
+
+ if force_path_as_root and not filename.startswith(path):
+ raise RuntimeError(f"{path2} must not be in folder above {path}")
+
+ # Another way to implement this. This is checking again but we want to be extra secure 👾
+ if Path(self._repo_folder).resolve(strict=True) not in Path(path, path2).resolve(strict=True).parents:
+ raise ValueError(f"{path2} must not be in folder above root repo folder {self._repo_folder}")
+
+ if force_path_as_root and Path(path).resolve(strict=True) not in Path(path, path2).resolve(strict=True).parents:
+ raise ValueError(f"{path2} must not be in folder above {path}")
+
+
+ if os.path.exists(filename):
+ return filename
+
+ raise FileNotFoundError(f"{path2} in {path} not found")
+
+
+
+ def initialize_folder(self, path):
+ shutil.rmtree(path, ignore_errors=True)
+ Path(path).mkdir(parents=True, exist_ok=True)
+
+ def save_notes_runner(self):
+ if not self._run_id:
+ return # Nothing to do, but also no hard error needed
+
+ print(TerminalColors.HEADER, '\nSaving notes: ', TerminalColors.ENDC, self.__notes_helper.get_notes())
+ self.__notes_helper.save_to_db(self._run_id)
+
+ def clear_caches(self):
+ subprocess.check_output(['sync'])
+
+ if platform.system() == 'Darwin':
+ return
+ # 3 instructs kernel to drops page caches AND inode caches
+ subprocess.check_output(['sudo', '/usr/sbin/sysctl', '-w', 'vm.drop_caches=3'])
+
+ def check_system(self, mode='start'):
+ print(TerminalColors.HEADER, '\nChecking system', TerminalColors.ENDC)
+
+ if self._skip_system_checks:
+ print("System check skipped")
+ return
+
+ if mode =='start':
+ system_checks.check_start()
+ else:
+ raise RuntimeError('Unknown mode for system check:', mode)
+
+
+ def checkout_repository(self):
+ print(TerminalColors.HEADER, '\nChecking out repository', TerminalColors.ENDC)
+
+ if self._uri_type == 'URL':
+ # always remove the folder if URL provided, cause -v directory binding always creates it
+ # no check cause might fail when directory might be missing due to manual delete
+ if self._branch:
+ print(f"Branch specified: {self._branch}")
+ # git clone -b <branchname> --single-branch <remote-repo-url>
+ subprocess.run(
+ [
+ 'git',
+ 'clone',
+ '--depth', '1',
+ '-b', self._branch,
+ '--single-branch',
+ '--recurse-submodules',
+ '--shallow-submodules',
+ self._uri,
+ self._repo_folder
+ ],
+ check=True,
+ capture_output=True,
+ encoding='UTF-8',
+ )
+ else:
+ subprocess.run(
+ [
+ 'git',
+ 'clone',
+ '--depth', '1',
+ '--single-branch',
+ '--recurse-submodules',
+ '--shallow-submodules',
+ self._uri,
+ self._repo_folder
+ ],
+ check=True,
+ capture_output=True,
+ encoding='UTF-8'
+ ) # always name target-dir repo according to spec
+
+ else:
+ if self._branch:
+ # we never want to checkout a local directory to a different branch as this might also be the GMT directory itself and might confuse the tool
+ raise RuntimeError('Specified --branch but using local URI. Did you mean to specify a github url?')
+ # If the provided uri is a symlink we need to resolve it.
+ path = os.path.realpath(self._uri)
+ self.__working_folder = self._repo_folder = path
+
+ self._branch = subprocess.check_output(['git', 'branch', '--show-current'], cwd=self._repo_folder, encoding='UTF-8').strip()
+
+ git_repo_root = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], cwd=self._repo_folder, encoding='UTF-8').strip()
+ if git_repo_root != self._repo_folder:
+ raise RuntimeError('Supplied folder through --uri is not the root of the git repository. Please only supply the root folder and then the target directory through --filename')
+
+ # we can safely do this, even with problematic folders, as the folder can only be a local unsafe one when
+ # running in CLI mode
+
+ self._commit_hash, self._commit_timestamp = get_repo_info(self.join_paths(self._repo_folder, self._commit_hash_folder))
+
+ # This method loads the yml file and takes care that the includes work and are secure.
+ # It uses the tagging infrastructure provided by https://pyyaml.org/wiki/PyYAMLDocumentation
+ # Inspiration from https://github.com/tanbro/pyyaml-include which we can't use as it doesn't
+ # do security checking and has no option to select when imported
+ def load_yml_file(self):
+ #pylint: disable=too-many-ancestors
+ runner_join_paths = self.join_paths
+ class Loader(yaml.SafeLoader):
+ def __init__(self, stream):
+ # We need to find our own root as the Loader is instantiated in PyYaml
+ self._root = os.path.split(stream.name)[0]
+ super().__init__(stream)
+
+ def include(self, node):
+ # We allow two types of includes
+ # !include <filename> => ScalarNode
+ # and
+ # !include <filename> <selector> => SequenceNode
+ if isinstance(node, yaml.nodes.ScalarNode):
+ nodes = [self.construct_scalar(node)]
+ elif isinstance(node, yaml.nodes.SequenceNode):
+ nodes = self.construct_sequence(node)
+ else:
+ raise ValueError("We don't support Mapping Nodes to date")
+ try:
+ filename = runner_join_paths(self._root, nodes[0], force_path_as_root=True)
+ except RuntimeError as exc:
+ raise ValueError(f"Included compose file \"{nodes[0]}\" may only be in the same directory as the usage_scenario file as otherwise relative context_paths and volume_paths cannot be mapped anymore") from exc
+
+ with open(filename, 'r', encoding='utf-8') as f:
+ # We want to enable a deep search for keys
+ def recursive_lookup(k, d):
+ if k in d:
+ return d[k]
+ for v in d.values():
+ if isinstance(v, dict):
+ return recursive_lookup(k, v)
+ return None
+
+ # We can use load here as the Loader extends SafeLoader
+ if len(nodes) == 1:
+ # There is no selector specified
+ return yaml.load(f, Loader)
+
+ return recursive_lookup(nodes[1], yaml.load(f, Loader))
+
+ Loader.add_constructor('!include', Loader.include)
+
+ usage_scenario_file = self.join_paths(self._repo_folder, self._original_filename)
+
+ # We set the working folder now to the actual location of the usage_scenario
+ if '/' in self._original_filename:
+ self.__working_folder_rel = self._original_filename.rsplit('/', 1)[0]
+ self.__working_folder = usage_scenario_file.rsplit('/', 1)[0]
+ #self.__filename = usage_scenario_file.rsplit('/', 1)[1] # we currently do not use this variable
+ print("Working folder changed to ", self.__working_folder)
+
+
+ with open(usage_scenario_file, 'r', encoding='utf-8') as fp:
+ # We can use load here as the Loader extends SafeLoader
+ yml_obj = yaml.load(fp, Loader)
+ # Now that we have parsed the yml file we need to check for the special case in which we have a
+ # compose-file key. In this case we merge the data we find under this key but overwrite it with
+ # the data from the including file.
+
+ # We need to write our own merge method as dict.update doesn't do a "deep" merge
+ def merge_dicts(dict1, dict2):
+ if isinstance(dict1, dict):
+ for k, v in dict2.items():
+ if k in dict1 and isinstance(v, dict) and isinstance(dict1[k], dict):
+ merge_dicts(dict1[k], v)
+ else:
+ dict1[k] = v
+ return dict1
+ return dict1
+
+ new_dict = {}
+ if 'compose-file' in yml_obj.keys():
+ for k,v in yml_obj['compose-file'].items():
+ if k in yml_obj:
+ new_dict[k] = merge_dicts(v,yml_obj[k])
+ else: # just copy over if no key exists in usage_scenario
+ yml_obj[k] = v
+
+ del yml_obj['compose-file']
+
+ yml_obj.update(new_dict)
+
+ # If a service is defined as None we remove it. This is so we can have a compose file that starts
+ # all the various services but we can disable them in the usage_scenario. This is quite useful when
+ # creating benchmarking scripts and you want to have all options in the compose but not in each benchmark.
+ # The cleaner way would be to handle an empty service key throughout the code but would make it quite messy
+ # so we chose to remove it right at the start.
+ for key in [sname for sname, content in yml_obj.get('services', {}).items() if content is None]:
+ del yml_obj['services'][key]
+
+ self._usage_scenario = yml_obj
+
+ def initial_parse(self):
+
+ self.load_yml_file()
+
+ schema_checker = SchemaChecker(validate_compose_flag=True)
+ schema_checker.check_usage_scenario(self._usage_scenario)
+
+ print(TerminalColors.HEADER, '\nHaving Usage Scenario ', self._usage_scenario['name'], TerminalColors.ENDC)
+ print('From: ', self._usage_scenario['author'])
+ print('Description: ', self._usage_scenario['description'], '\n')
+
+ if self._allow_unsafe:
+ print(TerminalColors.WARNING, arrows('Warning: Runner is running in unsafe mode'), TerminalColors.ENDC)
+
+ if self._usage_scenario.get('architecture') is not None and self._architecture != self._usage_scenario['architecture'].lower():
+ raise RuntimeError(f"Specified architecture does not match system architecture: system ({self._architecture}) != specified ({self._usage_scenario.get('architecture')})")
+
+ self._sci['R_d'] = self._usage_scenario.get('sci', {}).get('R_d', None)
+
+ def prepare_docker(self):
+ # Disable Docker CLI hints (e.g. "What's Next? ...")
+ os.environ['DOCKER_CLI_HINTS'] = 'false'
+
+ def check_running_containers(self):
+ result = subprocess.run(['docker', 'ps' ,'--format', '{{.Names}}'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ check=True, encoding='UTF-8')
+ for line in result.stdout.splitlines():
+ for running_container in line.split(','): # if docker container has multiple tags, they will be split by comma, so we only want to
+ for service_name in self._usage_scenario.get('services', {}):
+ if 'container_name' in self._usage_scenario['services'][service_name]:
+ container_name = self._usage_scenario['services'][service_name]['container_name']
+ else:
+ container_name = service_name
+
+ if running_container == container_name:
+ raise PermissionError(f"Container '{container_name}' is already running on system. Please close it before running the tool.")
+
+ def populate_image_names(self):
+ for service_name, service in self._usage_scenario.get('services', {}).items():
+ if not service.get('image', None): # image is a non-mandatory field. But we need it, so we tmp it
+ if self._dev_cache_build:
+ service['image'] = f"{service_name}"
+ else:
+ service['image'] = f"{service_name}_{random.randint(500000,10000000)}"
+
+ def remove_docker_images(self):
+ if self._dev_cache_build:
+ return
+
+ print(TerminalColors.HEADER, '\nRemoving all temporary GMT images', TerminalColors.ENDC)
+ subprocess.run(
+ 'docker images --format "{{.Repository}}:{{.Tag}}" | grep "gmt_run_tmp" | xargs docker rmi -f',
+ shell=True,
+ stderr=subprocess.DEVNULL, # to suppress showing of stderr
+ check=False,
+ )
+
+ if self._full_docker_prune:
+ print(TerminalColors.HEADER, '\nStopping and removing all containers, build caches, volumes and images on the system', TerminalColors.ENDC)
+ subprocess.run('docker ps -aq | xargs docker stop', shell=True, check=False)
+ subprocess.run('docker images --format "{{.ID}}" | xargs docker rmi -f', shell=True, check=False)
+ subprocess.run(['docker', 'system', 'prune' ,'--force', '--volumes'], check=True)
+ elif self._docker_prune:
+ print(TerminalColors.HEADER, '\nRemoving all unassociated build caches, networks volumes and stopped containers on the system', TerminalColors.ENDC)
+ subprocess.run(['docker', 'system', 'prune' ,'--force', '--volumes'], check=True)
+ else:
+ print(TerminalColors.WARNING, arrows('Warning: GMT is not instructed to prune docker images and build caches. \nWe recommend to set --docker-prune to remove build caches and anonymous volumes, because otherwise your disk will get full very quickly. If you want to measure also network I/O delay for pulling images and have a dedicated measurement machine please set --full-docker-prune'), TerminalColors.ENDC)
+
+ '''
+ A machine will always register in the database on run.
+ This means that it will write its machine_id and machine_descroption to the machines table
+ and then link itself in the runs table accordingly.
+ '''
+ def register_machine_id(self):
+ config = GlobalConfig().config
+ if config['machine'].get('id') is None \
+ or not isinstance(config['machine']['id'], int) \
+ or config['machine'].get('description') is None \
+ or config['machine']['description'] == '':
+ raise RuntimeError('You must set machine id and machine description')
+
+ machine = Machine(machine_id=config['machine'].get('id'), description=config['machine'].get('description'))
+ machine.register()
+
+ def initialize_run(self):
+ config = GlobalConfig().config
+
+ gmt_hash, _ = get_repo_info(GMT_ROOT_DIR)
+
+ # There are two ways we get hardware info. First things we don't need to be root to do which we get through
+ # a method call. And then things we need root privilege which we need to call as a subprocess with sudo. The
+ # install.sh script should have added the script to the sudoes file.
+ machine_specs = hardware_info.get_default_values()
+
+ if len(hardware_info_root.get_root_list()) > 0:
+ ps = subprocess.run(['sudo', '/usr/bin/python3', '-m', 'lib.hardware_info_root'], stdout=subprocess.PIPE, cwd=GMT_ROOT_DIR, check=True, encoding='UTF-8')
+ machine_specs_root = json.loads(ps.stdout)
+ machine_specs.update(machine_specs_root)
+
+ measurement_config = {}
+
+ measurement_config['settings'] = {k: v for k, v in config['measurement'].items() if k != 'metric_providers'} # filter out static metric providers which might not be relevant for platform we are running on
+ measurement_config['providers'] = utils.get_metric_providers(config) # get only the providers relevant to our platform
+ measurement_config['allowed_run_args'] = self._allowed_run_args
+ measurement_config['disabled_metric_providers'] = self._disabled_metric_providers
+ measurement_config['sci'] = self._sci
+
+
+ # We issue a fetch_one() instead of a query() here, cause we want to get the RUN_ID
+ self._run_id = DB().fetch_one("""
+ INSERT INTO runs (
+ job_id, name, uri, branch, filename,
+ commit_hash, commit_timestamp, runner_arguments,
+ machine_specs, measurement_config,
+ usage_scenario, gmt_hash,
+ machine_id, user_id, created_at
+ )
+ VALUES (
+ %s, %s, %s, %s, %s,
+ %s, %s, %s,
+ %s, %s,
+ %s, %s,
+ %s, %s, NOW()
+ )
+ RETURNING id
+ """, params=(
+ self._job_id, self._name, self._uri, self._branch, self._original_filename,
+ self._commit_hash, self._commit_timestamp, json.dumps(self._arguments),
+ escape(json.dumps(machine_specs), quote=False), json.dumps(measurement_config),
+ escape(json.dumps(self._usage_scenario), quote=False), gmt_hash,
+ GlobalConfig().config['machine']['id'], self._user_id,
+ ))[0]
+ return self._run_id
+
+ def import_metric_providers(self):
+ if self._dev_no_metrics:
+ print(TerminalColors.HEADER, '\nSkipping import of metric providers', TerminalColors.ENDC)
+ return
+
+ config = GlobalConfig().config
+
+ print(TerminalColors.HEADER, '\nImporting metric providers', TerminalColors.ENDC)
+
+ metric_providers = utils.get_metric_providers(config)
+
+ if not metric_providers:
+ print(TerminalColors.WARNING, arrows('No metric providers were configured in config.yml. Was this intentional?'), TerminalColors.ENDC)
+ return
+
+ subprocess.run(["docker", "info"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, encoding='UTF-8', check=True)
+
+ for metric_provider in metric_providers: # will iterate over keys
+ module_path, class_name = metric_provider.rsplit('.', 1)
+ module_path = f"metric_providers.{module_path}"
+ conf = metric_providers[metric_provider] or {}
+
+ if class_name in self._disabled_metric_providers:
+ print(TerminalColors.WARNING, arrows(f"Not importing {class_name} as disabled per user settings"), TerminalColors.ENDC)
+ continue
+
+ print(f"Importing {class_name} from {module_path}")
+ module = importlib.import_module(module_path)
+
+ if self._skip_system_checks:
+ metric_provider_obj = getattr(module, class_name)(**conf, skip_check=True)
+ print(f"Configuration is {conf}; skip_check=true")
+ else:
+ metric_provider_obj = getattr(module, class_name)(**conf)
+ print(f"Configuration is {conf}")
+
+
+
+
+ self.__metric_providers.append(metric_provider_obj)
+
+ if hasattr(metric_provider_obj, 'get_docker_params'):
+ services_list = ",".join(list(self._usage_scenario.get('services', {}).keys()))
+ self.__docker_params += metric_provider_obj.get_docker_params(no_proxy_list=services_list)
+
+
+ self.__metric_providers.sort(key=lambda item: 'rapl' not in item.__class__.__name__.lower())
+
+ def download_dependencies(self):
+ if self._dev_cache_build:
+ print(TerminalColors.HEADER, '\nSkipping downloading dependencies', TerminalColors.ENDC)
+ return
+
+ print(TerminalColors.HEADER, '\nDownloading dependencies', TerminalColors.ENDC)
+ subprocess.run(['docker', 'pull', 'gcr.io/kaniko-project/executor:latest'], check=True)
+
+ def get_build_info(self, service):
+ if isinstance(service['build'], str):
+ # If build is a string we can assume the short form
+ context = service['build']
+ dockerfile = 'Dockerfile'
+ else:
+ context = service['build'].get('context', '.')
+ dockerfile = service['build'].get('dockerfile', 'Dockerfile')
+
+ return context, dockerfile
+
+ def clean_image_name(self, name):
+ # clean up image name for problematic characters
+ name = re.sub(r'[^A-Za-z0-9_]', '', name)
+ # only lowercase letters are allowed for tags
+ name = name.lower()
+ name = f"{name}_gmt_run_tmp"
+ return name
+
+ def build_docker_images(self):
+ print(TerminalColors.HEADER, '\nBuilding Docker images', TerminalColors.ENDC)
+
+ # Create directory /tmp/green-metrics-tool/docker_images
+ temp_dir = f"{self._tmp_folder}/docker_images"
+ self.initialize_folder(temp_dir)
+
+ # technically the usage_scenario needs no services and can also operate on an empty list
+ # This use case is when you have running containers on your host and want to benchmark some code running in them
+ for _, service in self._usage_scenario.get('services', {}).items():
+ # minimal protection from possible shell escapes.
+ # since we use subprocess without shell we should be safe though
+ if re.findall(r'(\.\.|\$|\'|"|`|!)', service['image']):
+ raise ValueError(f"In scenario file the builds contains an invalid image name: {service['image']}")
+
+ tmp_img_name = self.clean_image_name(service['image'])
+
+ # If we are in developer repeat runs check if the docker image has already been built
+ try:
+ subprocess.run(['docker', 'inspect', '--type=image', tmp_img_name],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='UTF-8',
+ check=True)
+ # The image exists so exit and don't build
+ print(f"Image {service['image']} exists in build cache. Skipping build ...")
+ continue
+ except subprocess.CalledProcessError:
+ pass
+
+ if 'build' in service:
+ context, dockerfile = self.get_build_info(service)
+ print(f"Building {service['image']}")
+ self.__notes_helper.add_note({'note': f"Building {service['image']}", 'detail_name': '[NOTES]', 'timestamp': int(time.time_ns() / 1_000)})
+
+ # Make sure the context docker file exists and is not trying to escape some root. We don't need the returns
+ context_path = self.join_paths(self.__working_folder, context)
+ self.join_paths(context_path, dockerfile)
+
+ docker_build_command = ['docker', 'run', '--rm',
+ '-v', '/workspace',
+ '-v', f"{self._repo_folder}:/tmp/repo:ro", # this is the folder where the usage_scenario is!
+ '-v', f"{temp_dir}:/output",
+ 'gcr.io/kaniko-project/executor:latest',
+ f"--dockerfile=/tmp/repo/{self.__working_folder_rel}/{context}/{dockerfile}",
+ '--context', f'dir:///tmp/repo/{self.__working_folder_rel}/{context}',
+ f"--destination={tmp_img_name}",
+ f"--tar-path=/output/{tmp_img_name}.tar",
+ '--cleanup=true',
+ '--no-push']
+
+ if self.__docker_params:
+ docker_build_command[2:2] = self.__docker_params
+
+ print(' '.join(docker_build_command))
+
+ if self._measurement_total_duration:
+ ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', timeout=self._measurement_total_duration, check=False)
+ else:
+ ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', check=False)
+
+ if ps.returncode != 0:
+ print(f"Error: {ps.stderr} \n {ps.stdout}")
+ raise OSError(f"Docker build failed\nStderr: {ps.stderr}\nStdout: {ps.stdout}")
+
+ # import the docker image locally
+ image_import_command = ['docker', 'load', '-q', '-i', f"{temp_dir}/{tmp_img_name}.tar"]
+ print(' '.join(image_import_command))
+ ps = subprocess.run(image_import_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', check=False)
+
+ if ps.returncode != 0 or ps.stderr != "":
+ print(f"Error: {ps.stderr} \n {ps.stdout}")
+ raise OSError("Docker image import failed")
+
+ else:
+ print(f"Pulling {service['image']}")
+ self.__notes_helper.add_note({'note':f"Pulling {service['image']}" , 'detail_name': '[NOTES]', 'timestamp': int(time.time_ns() / 1_000)})
+ ps = subprocess.run(['docker', 'pull', service['image']], stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', check=False)
+
+ if ps.returncode != 0:
+ print(f"Error: {ps.stderr} \n {ps.stdout}")
+ if __name__ == '__main__':
+ print(TerminalColors.OKCYAN, '\nThe docker image could not be pulled. Since you are working locally we can try looking in your local images. Do you want that? (y/N).', TerminalColors.ENDC)
+ if sys.stdin.readline().strip().lower() == 'y':
+ try:
+ subprocess.run(['docker', 'inspect', '--type=image', service['image']],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='UTF-8',
+ check=True)
+ print('Docker image found locally. Tagging now for use in cached runs ...')
+ except subprocess.CalledProcessError:
+ raise OSError(f"Docker pull failed and image does not exist locally. Is your image name correct and are you connected to the internet: {service['image']}") from subprocess.CalledProcessError
+ else:
+ raise OSError(f"Docker pull failed. Is your image name correct and are you connected to the internet: {service['image']}")
+ else:
+ raise OSError(f"Docker pull failed. Is your image name correct and are you connected to the internet: {service['image']}")
+
+ # tagging must be done in pull and local case, so we can get the correct container later
+ subprocess.run(['docker', 'tag', service['image'], tmp_img_name], check=True)
+
+
+ # Delete the directory /tmp/gmt_docker_images
+ shutil.rmtree(temp_dir)
+
+ def save_image_and_volume_sizes(self):
+
+ for _, service in self._usage_scenario.get('services', {}).items():
+ tmp_img_name = self.clean_image_name(service['image'])
+
+ # This will report bogus values on macOS sadly that do not align with "docker images" size info ...
+ output = subprocess.check_output(
+ f"docker image inspect {tmp_img_name} " + '--format={{.Size}}',
+ shell=True,
+ encoding='UTF-8',
+ )