utils.aws

aws

Common code useful for AWS functions.

Example Usage: from utils import aws from utils.aws import ssm_get_parameter

   1#!/usr/bin/env python3
   2"""
   3aws
   4
   5Common code useful for AWS functions.
   6
   7Example Usage:
   8    from utils import aws
   9    from utils.aws import ssm_get_parameter
  10"""
  11import contextlib
  12import boto3
  13import json
  14import os
  15import sys
  16import base64
  17import time
  18import typing
  19try:
  20    import loggy
  21except ImportError:
  22    from utils import loggy
  23try:
  24    import docker
  25except ImportError:
  26    from utils import docker
  27try:
  28    from .release import get_version, get_source_branch
  29except ImportError:
  30    from utils.release import get_version, get_source_branch
  31# try:
  32#     from .common import subprocess_long as _long_run, remove_empty_from_dict
  33# except ImportError:
  34#     from utils.common import subprocess_long as _long_run, remove_empty_from_dict
  35
  36from subprocess_tee import run as _run
  37
  38# init = False
  39# _sessions = None
  40# _sessions_task = None
  41# _sessions_build_com = None
  42# _sessions_build_gov = None
  43# _region = None
  44# _sessions = Sessions()
  45ECR_BASE_COM = {
  46    "registry_id": "575815261832",
  47    "region": "us-east-1"
  48    }
  49ECR_BASE_GOV = {
  50    "registry_id": "823420400518",
  51    "region": "us-gov-east-1"
  52    }
  53
  54
  55class AwsCreds():
  56    access_key = None
  57    secret_access_key = None
  58    region = None
  59
  60
  61class AwsSession():
  62    session = None
  63    creds = None
  64    name = None
  65
  66    def __init__(self, name):
  67        self.creds = AwsCreds()
  68        self.name = name
  69        self.session = None
  70
  71
  72class AwsSessions():
  73    __session = AwsSession("default")
  74    __session_task = AwsSession("task")
  75    __session_build_com = AwsSession("build_com")
  76    __session_build_gov = AwsSession("build_gov")
  77    __session_ecr_build_com = AwsSession("ecr_build_com")
  78    __session_ecr_build_gov = AwsSession("ecr_build_gov")
  79    __region = None
  80
  81    def __init__(self):
  82        self.__init_session()
  83        self.name = self.__session.name
  84        self.region = self.__session.creds.region
  85        self.session = self.__session
  86
  87    def get_session(self) -> boto3.Session:
  88        """
  89        get_session()
  90
  91        Get a reusable boto3.Session() to run your own boto3 commands into aws.
  92        """
  93        return self.__session.session
  94
  95    def get_creds(self) -> AwsCreds:
  96        """
  97        get_creds()
  98
  99        Get a reusable AwsCreds() to run your own CLI commands into aws.
 100        """
 101        return self.__session.creds
 102
 103    def __get_session_build_com(self) -> AwsSession:
 104        if self.__session_build_com.session is None:
 105            loggy.info("aws.__get_session_build_com(): session is None. Attempting to make one.")
 106            self.__init_internal_build_sessions()
 107        return self.__session_build_com
 108
 109    def __get_session_build_gov(self) -> AwsSession:
 110        if self.__session_build_gov.session is None:
 111            loggy.info("aws.__get_session_build_gov(): session is None. Attempting to make one.")
 112            self.__init_internal_build_sessions()
 113        return self.__session_build_gov
 114
 115    def __get_session_ecr_build_com(self) -> AwsSession:
 116        if self.__session_ecr_build_com.session is None:
 117            loggy.info("aws.__get_session_ecr_build_com(): session is None. Attempting to make one.")
 118            self.__init_internal_ecr_session()
 119        return self.__session_ecr_build_com
 120
 121    def __get_creds_ecr_build_com(self) -> AwsCreds:
 122        if self.__session_ecr_build_com.creds is None:
 123            loggy.info("aws.__get_session_ecr_build_com(): creds is None. Attempting to make one.")
 124            self.__init_internal_ecr_session()
 125        return self.__session_ecr_build_com.creds
 126
 127    def __get_session_ecr_build_gov(self) -> AwsSession:
 128        if self.__session_ecr_build_gov.session is None:
 129            loggy.info("aws.__get_session_ecr_build_gov(): session is None. Attempting to make one.")
 130            self.__init_internal_ecr_session()
 131        return self.__session_ecr_build_gov
 132
 133    def __get_creds_ecr_build_gov(self) -> AwsCreds:
 134        if self.__session_ecr_build_gov.creds is None:
 135            loggy.info("aws.__get_session_ecr_build_gov(): creds is None. Attempting to make one.")
 136            self.__init_internal_ecr_session()
 137        return self.__session_ecr_build_gov.creds
 138
 139    def __get_task_region(self) -> str:
 140        """
 141        __get_task_region()
 142
 143        Get the region of the task container from metadata
 144        """
 145        loggy.info("aws.AwsSessions().__get_task_region: Getting region from task metadata")
 146        _r = "us-east-1"
 147        if os.environ.get('ECS_CONTAINER_METADATA_URI'):
 148            output = _run("curl -s ${ECS_CONTAINER_METADATA_URI}/task", check=True, shell=True)
 149            task_arn = json.loads(output.stdout)
 150            _r = task_arn['TaskARN'].split(':')[3]
 151
 152        return _r
 153
 154    def __init_session(self) -> AwsSession:
 155        """
 156        __init_session()
 157
 158        This function initializes a boto3 AWS session for use in boto3 clients, using
 159        the pipeline environment credentials.
 160
 161        Returns a reusable AwsSession() object
 162        """
 163        if self.__session and self.__session.session:
 164            return self.__session
 165
 166        self.__session.creds.region = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
 167
 168        if os.environ.get('AWS_ACCESS_KEY_ID') and os.environ.get('AWS_SECRET_ACCESS_KEY'):
 169            # loggy.info("aws.init_session(): Generating boto3 default session")
 170            self.__session.creds.access_key = os.environ.get('AWS_ACCESS_KEY_ID')
 171            self.__session.creds.secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
 172            self.__session.creds.region = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
 173
 174            self.__session.session = boto3.Session(
 175                aws_access_key_id=self.__session.creds.access_key,
 176                aws_secret_access_key=self.__session.creds.secret_access_key,
 177                region_name=self.__session.creds.region)
 178
 179        else:
 180            # loggy.info("aws.init_session(): boto3 default session set to None.")
 181            self.__session = AwsSession("default")
 182            return self.__session
 183
 184        _prime_the_session_pump = self.__session.session.client('sts').get_caller_identity()
 185
 186        # init = True
 187        _prime_the_session_pump = _prime_the_session_pump
 188        # self.__init_internal_sessions()
 189        # self.__init_internal_ecr_session()
 190        return self.__session
 191
 192    def __init_internal_ecr_session(self):
 193        """
 194        __init_internal_ecr_session()
 195
 196        This function creates reusable sessions for pushing to ecr based on the environment.
 197        This is meant to be an internal only function and does not return anything.
 198        """
 199
 200        if self.__session_ecr_build_com.session or self.__session_ecr_build_gov.session:
 201            return
 202
 203        secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/ecr-push", session=self.__get_session_build_com())
 204        # loggy.info("aws._init_internal_sessions(): Generating boto3 build com session")
 205        self.__session_ecr_build_com.creds.access_key = secret_string['access_key']
 206        self.__session_ecr_build_com.creds.secret_access_key = secret_string['secret_access_key']
 207        self.__session_ecr_build_com.creds.region = secret_string['region']
 208
 209        self.__session_ecr_build_com.session = boto3.Session(
 210            aws_access_key_id=self.__session_ecr_build_com.creds.access_key,
 211            aws_secret_access_key=self.__session_ecr_build_com.creds.secret_access_key,
 212            region_name=self.__session_ecr_build_com.creds.region
 213        )
 214        _prime_the_session_pump = self.__session_ecr_build_com.session.client('sts').get_caller_identity()
 215
 216        if 'gov' in os.environ.get('AWS_DEFAULT_REGION'):
 217            secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/ecr-push", session=self.__get_session_build_gov())
 218            # loggy.info("aws._init_internal_sessions(): Generating boto3 build com session")
 219            self.__session_ecr_build_gov.creds.access_key = secret_string['access_key']
 220            self.__session_ecr_build_gov.creds.secret_access_key = secret_string['secret_access_key']
 221            self.__session_ecr_build_gov.creds.region = secret_string['region']
 222
 223            self.__session_ecr_build_gov.session = boto3.Session(
 224                aws_access_key_id=self.__session_ecr_build_gov.creds.access_key,
 225                aws_secret_access_key=self.__session_ecr_build_gov.creds.secret_access_key,
 226                region_name=self.__session_ecr_build_gov.creds.region
 227            )
 228            _prime_the_session_pump = self.__session_ecr_build_gov.session.client('sts').get_caller_identity()
 229
 230        _prime_the_session_pump = _prime_the_session_pump
 231        return
 232
 233    def __init_internal_build_sessions(self):
 234        """
 235        __init_internal_build_sessions()
 236
 237        This function creates reusable sessions for _task, _build_com and/or _build_gov.
 238        This is meant to be an internal only function and does not return anything.
 239        """
 240        if self.__session_task.session or self.__session_build_com.session or self.__session_build_gov.session:
 241            return
 242
 243        with self.__unset_aws_envs():
 244            """
 245            Generate a session using the container task IAM role
 246            """
 247            loggy.info("aws.__init_internal_build_sessions(): Generating boto3 task session")
 248            self.__session_task.session = boto3.Session(region_name=self.__get_task_region())
 249            _prime_the_session_pump = self.__session_task.session.client('sts').get_caller_identity()
 250            # loggy.info("aws._init_internal_sessions(): boto3 task session: " + str(_prime_the_session_pump))
 251            self.__session_task.creds = None
 252            secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/gocd", session=self.__session_task)
 253
 254        # loggy.info("aws._init_internal_sessions(): Generating boto3 build com session")
 255
 256        self.__session_build_com.creds.access_key = secret_string['access_key']
 257        self.__session_build_com.creds.secret_access_key = secret_string['secret_access_key']
 258        self.__session_build_com.creds.region = secret_string['region']
 259
 260        """
 261        Generate a session for the build commercial account
 262        """
 263        self.__session_build_com.session = boto3.Session(
 264            aws_access_key_id=self.__session_build_com.creds.access_key,
 265            aws_secret_access_key=self.__session_build_com.creds.secret_access_key,
 266            region_name=self.__session_build_com.creds.region
 267        )
 268        _prime_the_session_pump = self.__session_build_com.session.client('sts').get_caller_identity()
 269
 270        if 'gov' in os.environ.get('AWS_DEFAULT_REGION', 'none'):
 271            # loggy.info("aws._init_internal_sessions(): Generating boto3 build gov session")
 272            secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/gocd_gov", session=self.__session_task)
 273
 274            self.__session_build_gov.creds.access_key = secret_string['access_key']
 275            self.__session_build_gov.creds.secret_access_key = secret_string['secret_access_key']
 276            self.__session_build_gov.creds.region = secret_string['region']
 277
 278            """
 279            Generate a session for the build gov account
 280            """
 281            self.__session_build_gov.session = boto3.Session(
 282                aws_access_key_id=self.__session_build_gov.creds.access_key,
 283                aws_secret_access_key=self.__session_build_gov.creds.secret_access_key,
 284                region_name=self.__session_build_gov.creds.region
 285            )
 286            _prime_the_session_pump = self.__session_build_gov.session.client('sts').get_caller_identity()
 287        else:
 288            # loggy.info("aws._init_internal_sessions(): boto3 build gov session set to None.")
 289            self.__session_build_gov.session = None
 290            self.__session_build_gov.creds = None
 291
 292        _prime_the_session_pump = _prime_the_session_pump
 293        # loggy.info("aws._init_internal_sessions(): Primed the session pumps: " + str(_prime_the_session_pump))
 294
 295        return
 296
 297    @contextlib.contextmanager
 298    def __unset_aws_envs(self, **environ):
 299        """
 300        __unset_aws_envs()
 301
 302        Temporarily unset aws environment variables.
 303
 304        >>> with _unset_aws_envs():
 305        ...   "AWS_ACCESS_KEY_ID" in os.environ
 306        False
 307        ...   "AWS_SECRET_ACCESS_KEY" in os.environ
 308        False
 309        ...   "AWS_DEFAULT_REGION" in os.environ
 310        False
 311
 312        >>> "AWS_ACCESS_KEY_ID" in os.environ
 313        AFXYZXYZXYZ
 314
 315        :type environ: dict[str, unicode]
 316        """
 317        old_environ = dict(os.environ)
 318        if os.environ.get("AWS_ACCESS_KEY_ID"):
 319            # loggy.info("aws._unset_aws_envs(): Temp Deleting AWS_ACCESS_KEY_ID from ENV")
 320            del os.environ["AWS_ACCESS_KEY_ID"]
 321        if os.environ.get("AWS_SECRET_ACCESS_KEY"):
 322            # loggy.info("aws._unset_aws_envs(): Temp Deleting AWS_SECRET_ACCESS_KEY from ENV")
 323            del os.environ["AWS_SECRET_ACCESS_KEY"]
 324        if os.environ.get("AWS_DEFAULT_REGION"):
 325            # loggy.info("aws._unset_aws_envs(): Temp Deleting AWS_SECRET_ACCESS_KEY from ENV")
 326            del os.environ["AWS_DEFAULT_REGION"]
 327
 328        try:
 329            # loggy.info("aws._unset_aws_envs(): Yielding back temp modified environment")
 330            yield
 331        finally:
 332            # loggy.info("aws._unset_aws_envs(): Clearing temp environ changes and returning original environ")
 333            os.environ.clear()
 334            os.environ.update(old_environ)
 335
 336    """
 337    Interal SSM functions
 338    """
 339
 340    def ssm_get_parameter(self, name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None):
 341        """
 342        ssm_get_parameter()
 343
 344        Get an SSM Parameter Value from the build com or gov account
 345
 346        name: String containing param friendly name or arn. Will convert arn into friendly name before use.
 347        region: String defaults to AWS_DEFAULT_REGION or us-east-1
 348        session: aws.Sessions() will use a different session to build the client, default is _session
 349        """
 350
 351        _s = self.__get_session_build_com() if session is None else session
 352        _r = _s.region_name if region is None else region
 353        if 'gov' in _r:
 354            _s = self.__get_session_build_gov() if session is None else session
 355
 356        loggy.info(f"aws.AwsSessions().ssm_get_parameter(): BEGIN (using session named: {_s.name})")
 357
 358        return ssm_get_parameter(name=name, session=_s, region=_r)
 359
 360    """
 361    Internal ECR functions
 362    """
 363
 364    def ecr_generate_fqcn(self, container: str) -> [str, str]:
 365        """
 366        ecr_generate_fqcn()
 367
 368        Generate a fully qualified aws docker container string for the build account
 369        (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
 370
 371        container: String representing container name:tag
 372
 373        Returns: String container and String tag (None if tag doesn't exist)
 374        """
 375        _s = self.__get_session_ecr_build_com()
 376        _r = ecr_get_region()
 377
 378        loggy.info(f"aws.AwsSessions().ecr_generate_fqcn(): BEGIN (using session named: {_s.name})")
 379        return ecr_generate_fqcn(container=container, session=_s, region=_r)
 380
 381    def ecr_tag_to_build(self, container: str, tag_list: list) -> bool:
 382        """
 383        ecr_tag_to_build()
 384
 385        Add a Tag to an existing remote ecr container.
 386        Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
 387
 388        container: String containing existing remote container with tag "container:tag"
 389        tag_list: String List containing new tags to add to the local container
 390        """
 391        _s = self.__get_session_ecr_build_com()
 392        _r = ecr_get_region()
 393
 394        loggy.info(f"aws.AwsSessions().ecr_tag_to_build(): BEGIN (using session named: {_s.name})")
 395        for tag in tag_list:
 396            if not ecr_tag(container=container, tag=tag, session=_s, region=_r):
 397                loggy.info(f"aws.AwsSessions().ecr_tag_to_build(): Failed to put tag {tag}")
 398                return False
 399
 400        return True
 401
 402    def ecr_promote_to_gov(self, container: str, tag: str, push_all_tags: typing.Optional[bool] = True) -> bool:
 403        """
 404        ecr_promote_to_gov()
 405
 406        Pull container from ECR Commercial and Push it into ECR GovCloud.
 407        NOTE: This will ONLY work from a gov pipeline!
 408
 409        container: String containing existing remote container with tag "container"
 410        tag: String containing existing tag for container
 411        push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair
 412
 413        Returns: True/False
 414        """
 415        if 'gov' not in os.environ.get('AWS_DEFAULT_REGION'):
 416            loggy.info("aws.AwsSession().ecr_promote_to_gov(): THIS IS NOT A GOV PIPELINE! RETURNING FALSE.")
 417            return False
 418
 419        _s_from = self.__get_session_ecr_build_com()
 420        _r_from = ecr_get_region()
 421
 422        _s_to = self.__get_session_ecr_build_gov()
 423        _r_to = ecr_get_region(_s_to)
 424        _reg_id_to = ecr_get_registry_id(_s_to)
 425
 426        loggy.info(f"aws.AwsSessions().ecr_promote_to_gov(): BEGIN (using sessions named: {_s_from.name} and {_s_to.name})")
 427
 428        _c, _t = self.ecr_pull_from_build(container, tag)
 429
 430        # Grab all of the tags for this image
 431        # aws --profile build-com ecr describe-images --repository-name 'warp/api' --image-ids imageTag=prod_rc | jq .imageDetails[0].imageTags
 432        _details = ecr_describe_image(container=_c, tag=_t, session=_s_from, region=_r_from)
 433        _tag_list = _details['imageDetails'][0]['imageTags']
 434        if not ecr_login(registry_id=_reg_id_to, session=_s_to, region=_r_to):
 435            loggy.info("aws.AwsSessions().ecr_promote_to_gov(): Failed to log into the gov build ECR")
 436            return False
 437
 438        return self.ecr_push_and_scan(container=_c, tag=_t, tag_list=_tag_list, session=_s_to)
 439
 440    def ecr_pull_from_build(self, container: str, tag: str) -> [str, str]:
 441        """
 442        ecr_pull_from_build()
 443
 444        Pull an ECR container from the build account.
 445
 446        container: String containing existing remote container
 447        tag: String containing tag of remote container
 448
 449        Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
 450        """
 451        _s = self.__get_session_ecr_build_com()
 452        _ri = ecr_get_registry_id()
 453        _r = ecr_get_region()
 454
 455        loggy.info(f"aws.AwsSessions().ecr_pull_from_build(): BEGIN (using session named: {_s.name})")
 456
 457        if not ecr_login(registry_id=_ri, session=_s, region=_r):
 458            return None, None
 459
 460        return ecr_pull(container=container, tag=tag, session=_s, region=_r)
 461
 462    def ecr_push_and_scan(self, container: str, tag: str, tag_list: typing.Optional[list] = None, session: typing.Optional[AwsSession] = None) -> bool:
 463        """
 464        ecr_push_and_scan()
 465
 466        Push a new container to build account ECR.
 467        This is only allowed from develop or hotfix branches.
 468
 469        container: String representing local container name
 470        tag: String with existing local tag for container name
 471        tag_list: Optional String List of additional tags to push with the container
 472        session: will use a different to build the client, default is build session
 473
 474        Returns: True/False
 475        """
 476        _b = get_source_branch()
 477
 478        if 'develop' not in _b and 'hotfix' not in _b and 'gov' not in os.environ.get('AWS_DEFAULT_REGION'):
 479            # check if image exists already. If it doesn't, we cannot make a new one. If it does we can promote it.
 480            loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Cannot push from branch ({_b}). Only develop and hotfix allowed.")
 481            sys.exit(f"aws.AwsSessions().ecr_push_and_scan(): Cannot push from branch ({_b}). Only develop and hotfix allowed.")
 482
 483        _s = self.__get_session_ecr_build_com() if session is None else session
 484        _ri = ecr_get_registry_id(session=_s)
 485        _r = ecr_get_region(session=_s)
 486
 487        loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): BEGIN (using session named: {_s.name})")
 488        try:
 489            ecr_login(registry_id=_ri, session=_s, region=_r)
 490
 491            #
 492            # We only want to scan the first tag we push.
 493            #
 494            _container, _tag = ecr_generate_fqcn(container=f"{container}:{tag}", session=_s, region=_r)
 495
 496            loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Pushing local container ({container}:{tag}) to remote ({_container}:{_tag})")
 497
 498            _temp_tag = f"{get_version()}.SCAN"
 499
 500            loggy.info("aws.AwsSessions().ecr_push_and_scan(): TAG FOR SCAN")
 501            loggy.info(f"docker tag {container}:{_tag} {_container}:{_temp_tag}")
 502            if not docker.docker("tag", f"{container}:{_tag}", f"{_container}:{_temp_tag}"):
 503                return False
 504
 505            loggy.info("aws.AwsSessions().ecr_push_and_scan(): PUSH TAG FOR SCAN")
 506            if not docker.docker("push", f"{_container}:{_temp_tag}"):
 507                return False
 508
 509            #
 510            # ECR Scan
 511            #
 512            if ecr_scan(container=container, tag=_temp_tag, session=_s, region=_r):
 513                #
 514                # Push the original local tag into ECR
 515                #
 516                loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Post Scan - Pushing original tag ({tag}) using manifest")
 517                if not ecr_tag(container=f"{_container}:{_temp_tag}", tag=tag, session=_s, region=_r):
 518                    return False
 519                # if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:{tag}"):
 520                #     return False
 521                # if not docker.docker("push", f"{_container}:{tag}"):
 522                #     return False
 523
 524                if tag_list:
 525                    for t in tag_list:
 526                        loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Post Scan - Pushing tag ({t}) using manifest")
 527                        if not ecr_tag(container=f"{_container}:{_temp_tag}", tag=t, session=_s, region=_r):
 528                            return False
 529                        # if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:{t}"):
 530                        #     return False
 531                        # if not docker.docker("push", f"{_container}:{t}"):
 532                        #     return False
 533
 534                #
 535                # Push initial set of tags for this container so we have them ready when we
 536                # build out any of our standard named environments.
 537                #
 538                for t in ecr_check_tag_exists(container=_container, tag_list=['develop', 'qa', 'stage', 'hotfix', 'demo', 'prod'], session=_s, region=_r):
 539                    loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): CDK tag for ENV {t} does not exist. Pushing tag so CDK can deploy/create successfully.")
 540                    _build_stub_container, _build_stub_tag = ecr_pull_from_build(container='gocd/stub', tag='stub')
 541                    if not ecr_login(registry_id=_ri, session=_s, region=_r):
 542                        return False
 543                    _stub_container, _stub_tag = ecr_generate_fqcn(container="gocd/stub:stub", session=_s, region=_r)
 544                    if not docker.docker("tag", f"{_build_stub_container}:{_build_stub_tag}", f"{_container}:{t}"):
 545                        return False
 546                    if not docker.docker("push", f"{_container}:{t}"):
 547                        return False
 548
 549                # else:
 550                #     loggy.info("aws.AwsSessions().ecr_push_and_scan(): No more tags to push")
 551            else:
 552                # Remove the original tag and add a FAILED tag to this container
 553                if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:FAILED_SCAN"):
 554                    return False
 555                if not docker.docker("push", f"{_container}:FAILED_SCAN"):
 556                    return False
 557
 558                try:
 559                    client = _s.session.client('ecr', region_name=_r)
 560                    response = client.batch_delete_image(
 561                        repositoryName=ecr_strip_container_name(_container),
 562                        imageIds=[{
 563                            'imageTag': _temp_tag
 564                        }]
 565                    )
 566                except Exception as e:
 567                    loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Failed to delete the temp tag {_temp_tag} from container after scan failed. {str(e)}")
 568                    fail_code = response['failures'][0]['failureCode']
 569                    fail_reason = response['failures'][0]['failureReason']
 570                    loggy.info(f"Failure Code ({fail_code}). Failure Reason ({fail_reason})")
 571
 572                loggy.info("aws.AwsSessions().ecr_push_and_scan(): Container Scan failed. Check out pipeline results tab or ECR for Vulns.")
 573
 574        except Exception as e:
 575            loggy.error("aws.AwsSessions().ecr_push_and_scan(): Logging out of ECR repo.")
 576            ecr_logout(registry_id=_ri, session=_s, region=_r)
 577            sys.exit(f"aws.AwsSessions().ecr_push_and_scan(): Error: {str(e)}")
 578
 579        loggy.info("aws.AwsSessions().ecr_push_and_scan(): Logging out of ECR repo.")
 580        ecr_logout(registry_id=_ri, session=_s, region=_r)
 581
 582        return True
 583
 584
 585if os.environ.get('HOTFIX_HAMMER') and get_version() not in os.environ['HOTFIX_HAMMER']:
 586    loggy.error("aws.py: HOTFIX HAMMER HAS BEEN THROWN. Contact CloudOps to remove the hammer.")
 587    sys.exit("aws.py: HOTFIX HAMMER HAS BEEN THROWN. Contact CloudOps to remove the hammer.")
 588
 589_sessions = AwsSessions()
 590
 591
 592"""
 593Global Utils
 594"""
 595
 596
 597def get_aws_account_id(session: typing.Optional[AwsSession] = None) -> str:
 598    """
 599    get_aws_account_id()
 600
 601    Get the aws account ID using the local AWS_ACCESS_KEY_ID credentials.
 602
 603    session will use a different session to build the client, default is _sessions
 604    """
 605    global _sessions
 606    _s = _sessions.session if session is None else session
 607    loggy.info(f"aws.get_aws_account_id(): BEGIN (using session named: {_s.name})")
 608
 609    try:
 610        client = _s.session.client('sts')
 611        response = client.get_access_key_info(AccessKeyId=_s.creds.access_key)
 612        if 'Account' not in response:
 613            loggy.error("Error: Account ID not returned")
 614    except Exception as e:
 615        loggy.error("Error: " + str(e))
 616
 617    return response['Account']
 618
 619
 620def get_region(session: typing.Optional[AwsSession] = None) -> str:
 621    """
 622    get_region()
 623
 624    Get the aws region from the session.
 625
 626    session will use a different session to build the client, default is global _sessions
 627    """
 628    global _sessions
 629    _s = _sessions.session if session is None else session
 630    loggy.info(f"aws.get_region(): BEGIN (using session named: {_s.name})")
 631
 632    return _s.session.region_name
 633
 634
 635def get_session():
 636    """
 637    get_session()
 638
 639    Get the aws session that was instantiated on import. This will be tied to
 640    the local pipeline environment. Useful if you want to build out your own
 641    boto3 clients to run commands that aren't yet supported in this library.
 642
 643    returns: boto3.Session() object
 644    """
 645    global _sessions
 646    loggy.info(f"aws.get_session(): BEGIN (using session named: {_sessions.name})")
 647    return _sessions.session.session
 648
 649
 650"""
 651CloudFront Utils
 652"""
 653
 654
 655def cloudfront_create_invalidation(dist: str, items: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
 656    """
 657    cloudfront_create_invalidation()
 658
 659    Create an invalidation on a CloudFront distribution.
 660
 661    dist: String Distribution ID
 662    items: String List of paths. (i.e. ["/*"])
 663    session: will use a different session to build the client, default is _sessions
 664    """
 665    global _sessions
 666
 667    _s = _sessions.session if session is None else session
 668    _r = _s.session.region_name if region is None else region
 669    loggy.info(f"aws.cloudfront_create_invalidation(): BEGIN (using session named: {_s.name})")
 670
 671    try:
 672        client = _s.session.client('cloudfront', region_name=_r)
 673        response = client.create_invalidation(
 674            DistributionId=dist,
 675            InvalidationBatch={
 676                'Paths': {
 677                    'Quantity': 1,
 678                    'Items': items
 679                },
 680                'CallerReference': str(time.time()).replace(".", "")
 681            }
 682        )
 683    except Exception as e:
 684        loggy.error("Error: " + str(e))
 685
 686    invalidation_id = response['Invalidation']['Id']
 687    return invalidation_id
 688
 689
 690"""
 691ECS Utils
 692"""
 693
 694
 695def ecs_deploy(clusterArn: str, serviceArn: str, tag: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
 696    """
 697    ecsDeploy.py
 698
 699    Deploy a new task definition to an existing ECS Cluster/Service
 700
 701    NOTE 1: This is NOT blue/green. Look at `ecsBlueGreenDeploy.py`
 702    NOTE 2: This will check if the clusterArn/serviceArn are NOT aws arns and instead
 703            use them as ssm parameter names to grab the values
 704
 705    clusterArn: String (Optional) Will use ECS_CLUSTER_ARN from os.environ as default
 706    serviceArn: String (Optional) Will use ECS_SERVICE_ARN from os.environ as default
 707    tag: String (Optional) - Will use `release.get_version()` as default
 708    session: AwsSession (Optional) will use a different session to build the client, default is _sessions
 709    region: String (Optional)
 710
 711    Returns: True/False
 712    """
 713    global _sessions
 714
 715    _s = _sessions.session if session is None else session
 716    _r = _s.session.region_name if region is None else region
 717    loggy.info(f"aws.ecsDeploy(): BEGIN (using session named: {_s.name})")
 718
 719    _TAG = tag if tag is not None else get_version()
 720    _CLUSTER_ARN = clusterArn
 721    _SERVICE_ARN = serviceArn
 722
 723    """
 724    If the _CLUSTER_ARN or _SERVICE_ARN are not in ARN format, consider them SSM Param names
 725    and use them to grab the ARNS out of SSM Params
 726    """
 727    _CLUSTER = _CLUSTER_ARN if ':' in _CLUSTER_ARN else ssm_get_parameter(name=_CLUSTER_ARN, session=_s, region=_r)
 728    _SERVICE = _SERVICE_ARN if ':' in _SERVICE_ARN else ssm_get_parameter(name=_SERVICE_ARN, session=_s, region=_r)
 729
 730    """
 731    go get the entire task definition for a service by name (might need the cluster too)
 732    """
 733    loggy.info("aws.ecsDeploy(): Looking up latest task definition for cluster/service")
 734    current_task_definition_arn = ecs_get_latest_task_definition_arn(cluster=_CLUSTER, service=_SERVICE, session=_s, region=_r)
 735    loggy.info("aws.ecsDeploy(): Storing the entire current task definition for rollback")
 736    current_task_definition = ecs_get_task_definition_from_arn(task_def_arn=current_task_definition_arn, session=_s, region=_r)
 737
 738    """
 739    This iterates through all current_task_definition.containers
 740    then for each container -> iterate through container.secrets
 741    for each secret -> find one where the key is "VERSION"
 742    if found, return the value which will be an ssm param arn
 743    """
 744    version_ssm_param_arn = ecs_get_version_param_name_from_task_def(task_def=current_task_definition)
 745
 746    """
 747    Get the currently deployed version number
 748    """
 749    old_version = ssm_get_parameter(name=version_ssm_param_arn, session=_s, region=_r)
 750
 751    """
 752    set the new version provided by the caller
 753    """
 754    new_version = _TAG
 755
 756    """
 757    This iterates over the containers again to set
 758    the new image in the container where
 759    we simply get the old image and replace the :{tag}
 760    before: docker.devops.rekor.io/blue/api:12345
 761    after: docker.devops.rekor.io/blue/api:$newVersion
 762    """
 763    new_task_definition = ecs_set_new_image_in_task_def(task_def=current_task_definition, version=new_version)
 764
 765    """
 766    Go register the next task def
 767    there should now be a new version of the task def
 768    """
 769    new_task_definition_arn = ecs_register_task_definition_revision(task_def=new_task_definition, session=_s, region=_r)
 770
 771    """
 772    update the ssm param with the new tag.
 773    This function should fail gracefully as not all appilcations use an SSM param
 774    to store its version. Scout uses the git commit hash for version awareness.
 775    """
 776    ssm_put_parameter(name=version_ssm_param_arn, value=_TAG, session=_s, region=_r)
 777
 778    """
 779    deploy new task def to the service
 780    """
 781    ecs_deploy_new_task_definition(cluster=_CLUSTER, service=_SERVICE, task_def_arn=new_task_definition_arn, session=_s, region=_r)
 782
 783    deploy_status = ecs_wait_services_stable(cluster=_CLUSTER, service=_SERVICE, session=_s, region=_r)
 784    if not deploy_status:
 785        loggy.info("ecsDeploy(): Deploy FAILED! Rolling back to original task def!")
 786        # Roll back procedures by rolling back the version param and setting the service back to the original task def
 787        ssm_put_parameter(name=version_ssm_param_arn, value=old_version, session=_s, region=_r)
 788        ecs_deploy_new_task_definition(cluster=_CLUSTER, service=_SERVICE, task_def_arn=current_task_definition_arn, session=_s, region=_r)
 789        deploy_status = ecs_wait_services_stable(cluster=_CLUSTER, service=_SERVICE, session=_s, region=_r)
 790        if not deploy_status:
 791            raise Exception("aws.ecsDeploy(): Rolling back to original task def failed!")
 792
 793        ecs_deregister_task_def(task_def=new_task_definition_arn, session=_s, region=_r)
 794        loggy.info("aws.ecsDeploy(): Deploy Failed! Rolled back to original task def.")
 795        return False
 796
 797    loggy.info("aws.ecsDeploy(): Deploy Successful")
 798    return True
 799
 800
 801def ecs_get_latest_task_definition_arn(cluster: str, service: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
 802    """
 803    ecs_get_latest_task_definition_arn()
 804
 805    Get the latest task definition arn for a particular service in a particular cluster
 806
 807    cluster: String containing ECS Cluster ARN
 808    service: String containing ECS Service ARN
 809
 810    Returns: String containing task def arn
 811    """
 812    global _sessions
 813    _s = _sessions.session if session is None else session
 814    _r = _s.session.region_name if region is None else region
 815    loggy.info(f"aws.ecs_get_latest_task_definition_arn(): BEGIN (using session named: {_s.name})")
 816
 817    loggy.info(f"aws.ecs_get_latest_task_definition_arn(): Searching for latest task_definition_arn in cluster/service ({cluster} / {service})")
 818
 819    task_def_arn = None
 820    try:
 821        client = _s.session.client('ecs', region_name=_r)
 822        response = client.describe_services(
 823            cluster=cluster,
 824            services=[
 825                service
 826            ]
 827        )
 828        task_def_arn = response['services'][0]['taskDefinition']
 829    except Exception as e:
 830        loggy.error(f"aws.ecs_get_latest_task_definition_arn(): Error: {str(e)}")
 831        raise
 832
 833    return task_def_arn
 834
 835
 836def ecs_get_task_definition_from_arn(task_def_arn: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
 837    """
 838    ecs_get_task_definition_from_arn()
 839
 840    Get clonable task definition (json) from a task definition arn
 841
 842    task_def_arn: String containing task def arn
 843
 844    Returns: dict containing enough of the task def to clone it
 845    """
 846    global _sessions
 847    _s = _sessions.session if session is None else session
 848    _r = _s.session.region_name if region is None else region
 849    loggy.info(f"aws.ecs_get_task_definition_from_arn(): BEGIN (using session named: {_s.name})")
 850
 851    loggy.info(f"aws.ecs_get_task_definition_from_arn(): Reading in full task definition from: {task_def_arn}")
 852
 853    try:
 854        client = _s.session.client('ecs', region_name=_r)
 855        response = client.describe_task_definition(
 856            taskDefinition=task_def_arn
 857        )
 858
 859        task_def = response['taskDefinition']
 860    except Exception as e:
 861        loggy.error(f"aws.ecs_get_task_definition_from_arn(): Error: {str(e)}")
 862        raise
 863
 864    # remove_props_list = [
 865    #     "taskDefinitionArn",
 866    #     "revision",
 867    #     "status",
 868    #     "requiresAttributes",
 869    #     "compatibilities",
 870    #     "runtimePlatform",
 871    #     "inferenceAccelerators",
 872    #     "registeredAt",
 873    #     "deregisteredAt",
 874    #     "registeredBy",
 875    #     "ephemeralStorage"
 876    # ]
 877    # for prop in remove_props_list:
 878    #     if prop in task_def:
 879    #         del task_def[prop]
 880
 881    # now we remove any empty values or lists
 882    # task_def = remove_empty_from_dict(task_def)
 883
 884    return task_def
 885
 886
 887def __ecs_check_version_in_secrets(secrets: dict) -> str:
 888    """
 889    """
 890    param_name = None
 891    for secret in secrets:
 892        if secret.get('name') and 'VERSION' in secret['name']:
 893            param_name = secret.get('valueFrom')
 894            break
 895    return param_name
 896
 897
 898def ecs_get_version_param_name_from_task_def(task_def: dict) -> str:
 899    """
 900    ecs_get_version_param_name_from_task_def()
 901
 902    Get the `version` SSM param name from the task definition
 903
 904    task_def: dict
 905
 906    Return: String containing version SSM param name
 907    """
 908    loggy.info(f"aws.ecs_get_version_param_name_from_task_def(): Searching for VERSION ssm parameter arn in containers inside of {task_def}")
 909
 910    param_name = None
 911    if not task_def.get('containerDefinitions'):
 912        raise Exception("aws.ecs_get_version_param_name_from_task_def(): Could not locate containerDefinitions inside of the task_def dict")
 913        return param_name
 914
 915    for container in task_def['containerDefinitions']:
 916        if container.get('secrets'):
 917            param_name = __ecs_check_version_in_secrets(container['secrets'])
 918            if param_name:
 919                break
 920
 921    return param_name
 922
 923
 924# def ecs_save_task_def_to_json_file(current_task_definition, old_task_definition_file_name):
 925#     """
 926#     ecs_save_task_def_to_json_file()
 927#
 928#     Save our task definition to a json file.
 929#     """
 930
 931
 932def ecs_set_new_image_in_task_def(task_def: dict, version: str) -> dict:
 933    """
 934    ecs_set_new_image_in_task_def()
 935
 936    Set/replace the tag/version of the containers in a task def hashmap and returns a new hashmap
 937
 938    task_def: dict containing task definition
 939    version: String containiing new container tag/version
 940
 941    Returns: dict task_def
 942    """
 943    if not task_def.get('containerDefinitions'):
 944        raise Exception("aws.ecs_set_new_image_in_task_def(): containerDefinitions not found in task_def.")
 945
 946    for container in task_def['containerDefinitions']:
 947        if not container.get('image'):
 948            raise Exception("aws.ecs_set_new_image_in_task_def(): container image value not found in returned list.")
 949            return {}
 950
 951        _image, _original_image_version = container['image'].split(':')
 952        _image = f"{_image}:{version}"
 953        loggy.info(f"aws.ecs_set_new_image_in_task_def(): Changing image version ({_original_image_version}) to ({version}) for container named ({container['name']}): new image is ${_image}")
 954        container['image'] = _image
 955
 956    return task_def
 957
 958
 959def ecs_register_task_definition_revision(task_def: dict, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
 960    """
 961    ecs_register_new_task_definition()
 962
 963    Register a new task definition in ECS
 964
 965    task_def: dict of a task definition to register
 966
 967    Returns: dict of new task_def
 968    """
 969    global _sessions
 970    _s = _sessions.session if session is None else session
 971    _r = _s.session.region_name if region is None else region
 972    loggy.info(f"aws.ecs_register_new_task_definition(): BEGIN (using session named: {_s.name})")
 973
 974    loggy.info("aws.ecs_register_new_task_definition(): Registering new task definition.")
 975
 976    try:
 977        client = _s.session.client('ecs', region_name=_r)
 978        if 'secrets' in task_def['containerDefinitions'][0]:
 979            response = client.register_task_definition(
 980                family=task_def['family'],
 981                containerDefinitions=task_def['containerDefinitions'],
 982                volumes=task_def['volumes'],
 983                executionRoleArn=task_def['executionRoleArn'],
 984                requiresCompatibilities=task_def['requiresCompatibilities'],
 985                networkMode=task_def['networkMode'],
 986                cpu=task_def['cpu'],
 987                memory=task_def['memory']
 988            )
 989        else:
 990            response = client.register_task_definition(
 991                family=task_def['family'],
 992                containerDefinitions=task_def['containerDefinitions'],
 993                volumes=task_def['volumes'],
 994                requiresCompatibilities=task_def['requiresCompatibilities'],
 995                networkMode=task_def['networkMode'],
 996                cpu=task_def['cpu'],
 997                memory=task_def['memory']
 998            )
 999
1000        task_def = response['taskDefinition']['taskDefinitionArn']
1001    except Exception as e:
1002        loggy.error(f"aws.ecs_register_new_task_definition(): Error: {str(e)}")
1003        return {}
1004
1005    return task_def
1006
1007
1008def ecs_deploy_new_task_definition(cluster: str, service: str, task_def_arn: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1009    """
1010    ecs_deploy_new_task_definition()
1011
1012    Deploy a task defition to a particular ECS Cluster/Service
1013
1014    cluster: String containing ECS Cluster Arn
1015    service: String containing ECS Service Arn
1016    task_def_arn: String containing task definition arn
1017
1018    Returns: True/False
1019    """
1020    global _sessions
1021    _s = _sessions.session if session is None else session
1022    _r = _s.session.region_name if region is None else region
1023    loggy.info(f"aws.ecs_deploy_new_task_definition(): BEGIN (using session named: {_s.name})")
1024
1025    loggy.info(f"aws.ecs_deploy_new_task_definition(): Deploying task defintion ({task_def_arn}) to cluster ({cluster} / service ({service}).")
1026
1027    try:
1028        client = _s.session.client('ecs', region_name=_r)
1029        client.update_service(
1030            cluster=cluster,
1031            service=service,
1032            taskDefinition=task_def_arn
1033        )
1034
1035    except Exception as e:
1036        loggy.error(f"aws.ecs_deploy_new_task_definition(): Error: {str(e)}")
1037        return False
1038
1039    return True
1040
1041
1042def ecs_wait_services_stable(cluster: str, service: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1043    """
1044    ecs_wait_services_stable()
1045
1046    Wait up to 10 minutes for ECS services to become stable
1047
1048    cluster: String containing cluster arn
1049    service: String containing service arn
1050
1051    Returns: True/False
1052    """
1053    global _sessions
1054    _s = _sessions.session if session is None else session
1055    _r = _s.session.region_name if region is None else region
1056    loggy.info(f"aws.ecs_wait_services_stable(): BEGIN (using session named: {_s.name})")
1057
1058    loggy.info(f"aws.ecs_wait_services_stable(): Waiting for services to become stable on cluster ({cluster} / service ({service}).")
1059
1060    try:
1061        client = _s.session.client('ecs', region_name=_r)
1062        waiter = client.get_waiter('services_stable')
1063
1064        waiter.wait(
1065            cluster=cluster,
1066            services=[
1067                service
1068            ])
1069    except Exception as e:
1070        loggy.info(f"aws.ecs_wait_services_stable(): The services did not become stable: {str(e)}")
1071        raise
1072        return False
1073
1074    return True
1075
1076
1077def ecs_deregister_task_def(task_def_arn: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1078    """
1079    ecs_deregister_task_def()
1080
1081    Deregister a task definition
1082
1083    task_def_arn: String containing task definition arn
1084
1085    Returns: True/False
1086    """
1087    global _sessions
1088    _s = _sessions.session if session is None else session
1089    _r = _s.session.region_name if region is None else region
1090    loggy.info(f"aws.ecs_deregister_task_def(): BEGIN (using session named: {_s.name})")
1091
1092    loggy.info(f"aws.ecs_deregister_task_def(): Deregistering task definition: {task_def_arn}")
1093
1094    try:
1095        client = _s.session.client('ecs', region_name=_r)
1096        response = client.deregister_task_definition(
1097            taskDefinition=task_def_arn
1098        )
1099        if not response.get('taskDefinition') or not response['taskDefinition'].get('deregisteredAt'):
1100            raise Exception
1101    except Exception as e:
1102        loggy.info(f"aws.ecs_deregister_task_def(): Failed to deregister task definition: {str(e)}")
1103        return False
1104
1105    return True
1106
1107
1108"""
1109ECR UTILS
1110"""
1111
1112
1113def ecr_get_manifest(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
1114    """
1115    ecr_get_manifest()
1116
1117    Get the image manifest for a remote container and tag
1118
1119    container: String representing container name
1120    tag: String tag to look up
1121    session: will use a different session to build the client, default is build session
1122    region: will use a different region to build the client, default is build region
1123
1124    Returns: String containing image manifest
1125    """
1126    global _sessions
1127    _s = _sessions.session if session is None else session
1128    _r = _s.session.region_name if region is None else region
1129    loggy.info(f"aws.ecr_get_manifest(): BEGIN (using session named: {_s.name})")
1130
1131    client = _s.session.client('ecr', region_name=_r)
1132
1133    manifest = None
1134    try:
1135        response = client.batch_get_image(
1136            repositoryName=ecr_strip_container_name(container),
1137            imageIds=[
1138                {
1139                    'imageTag': tag
1140                }
1141            ]
1142            # acceptedMediaTypes=['application/vnd.docker.distribution.manifest.v2+json'],
1143        )
1144        loggy.info(str(response))
1145        manifest = response['images'][0]['imageManifest']
1146        loggy.info(f"aws.ecr_get_manifest(): Found manifest {manifest}.")
1147    except Exception as e:
1148        loggy.info(f"aws.ecr_get_manifest(): Failed to retrieve manifest: {str(e)}")
1149        return manifest
1150
1151    return manifest
1152
1153
1154def ecr_put_image(container: str, tag: str, manifest: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1155    """
1156    ecr_put_image()
1157
1158    Put an image manifest for a remote container and tag. Used for re-tagging an
1159    existing image
1160
1161    container: String representing container name
1162    tag: String new tag to add to image
1163    manifest: String manifest of image
1164    session: will use a different session to build the client, default is build session
1165    region: will use a different region to build the client, default is build region
1166
1167    Returns: True/False
1168    """
1169    global _sessions
1170    _s = _sessions.session if session is None else session
1171    _r = _s.session.region_name if region is None else region
1172    loggy.info(f"aws.ecr_put_image(): BEGIN (using session named: {_s.name})")
1173
1174    client = _s.session.client('ecr', region_name=_r)
1175
1176    try:
1177        client.put_image(
1178            repositoryName=ecr_strip_container_name(container),
1179            imageTag=tag,
1180            imageManifest=manifest
1181        )
1182        # loggy.info(str(response))
1183        loggy.info("aws.ecr_put_image(): Successfully put new image.")
1184    except Exception as e:
1185        if 'already exists' in str(e):
1186            loggy.info(f"aws.ecr_put_image(): Image already exists. Passing. {str(e)}")
1187            return True
1188        else:
1189            loggy.info(f"aws.ecr_put_image(): Failed to put image: {str(e)}")
1190            return False
1191
1192    return True
1193
1194
1195def ecr_tag_to_build(container: str, tag_list: list) -> bool:
1196    """
1197    ecr_tag_to_build()
1198
1199    Add a Tag to an existing remote ecr container.
1200    Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
1201
1202    This is the default and only path to get a container pushed into production.
1203
1204    NOTE 1: You will be logged out of any existing ECR repos. run `aws.ecr_login()` to establish a new connection
1205
1206    container: String containing existing remote container with tag "container:tag"
1207    tag_list: List containing new tags to add to the local container
1208
1209    Returns: True/False
1210    """
1211    global _sessions
1212    loggy.info(f"aws.ecr_tag_to_build(): BEGIN (using session named: {_sessions.name})")
1213    return _sessions.ecr_tag_to_build(container=container, tag_list=tag_list)
1214
1215
1216def ecr_tag(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1217    """
1218    ecr_tag()
1219
1220    Add a Tag to an existing remote ecr container.
1221    Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
1222
1223    container: String containing existing remote container with tag "container:tag"
1224    tag: String containing new tag to add to the local container
1225
1226    Returns: True/False
1227    """
1228    global _sessions
1229    _s = _sessions.session if session is None else session
1230    _r = ecr_get_region() if session is None else _s.session.region_name if region is None else region
1231    loggy.info(f"aws.ecr_tag(): BEGIN (using session named: {_s.name})")
1232
1233    loggy.info(f"aws.ecr_tag(): Tagging {container} with {tag}")
1234    if ':' not in container:
1235        raise Exception("aws.ecr_tag(): container must include tag")
1236
1237    _c, _t = ecr_generate_fqcn(container=container, session=_s, region=_r)
1238
1239    manifest = ecr_get_manifest(container=_c, tag=_t, session=_s, region=_r)
1240
1241    if ecr_put_image(container=_c, tag=tag, manifest=manifest, session=_s, region=_r):
1242        loggy.info("aws.ecr_tag(): Successfully added tag to image")
1243    else:
1244        loggy.info("aws.ecr_tag(): Failed to add new tag to image")
1245        return False
1246
1247    return True
1248
1249
1250def ecr_promote_to_gov(container: str, tag: str, push_all_tags: typing.Optional[bool] = True) -> bool:
1251    """
1252    ecr_promote_to_gov()
1253
1254    Pull container from ECR Commercial and Push it into ECR GovCloud.
1255    NOTE: This will ONLY work from a gov pipeline!
1256
1257    container: String containing existing remote container with tag "container"
1258    tag: String containing existing tag for container
1259    push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair
1260
1261    Returns: True/False
1262    """
1263    global _sessions
1264    loggy.info(f"aws.ecr_promote_to_gov(): BEGIN (using session named: {_sessions.name})")
1265
1266    return _sessions.ecr_promote_to_gov(container, tag, push_all_tags)
1267
1268
1269def ecr_push_to_build(container: str, tag: str, tag_list: typing.Optional[list] = None) -> bool:
1270    """
1271    ecr_push_to_build()
1272
1273    Push a locally built container into our AWS build commercial account.
1274    This is the default and only path to get a container pushed into production.
1275
1276    NOTE 1: You must be running in either `develop` or `hotfix` branch.
1277    NOTE 2: You will be logged out of any existing ECR repos. run `aws.ecr_login()` to establish a new connection
1278
1279    container: String representing container name
1280    tag: String with existing local tag for container name
1281    tag_list: Optional String List of additional tags to push with the container
1282
1283    Returns: True/False
1284    """
1285    global _sessions
1286    loggy.info(f"aws.ecr_tag(): BEGIN (using session named: {_sessions.name})")
1287
1288    return _sessions.ecr_push_and_scan(container, tag, tag_list)
1289
1290
1291def ecr_get_registry_id(session: typing.Optional[AwsSession] = None) -> str:
1292    """
1293    ecr_get_registry_id()
1294
1295    Returns default registry_id (i.e. build account) unless a session is given.
1296
1297    session: AwsSession object to pull the registry_id from a session
1298    """
1299    if session:
1300        loggy.info(f"aws.ecr_get_registry_id(): BEGIN (using session named: {session.name})")
1301
1302    _registry_id = ECR_BASE_COM['registry_id'] if session is None else get_aws_account_id(session)
1303    return _registry_id
1304
1305
1306def ecr_get_region(session: typing.Optional[AwsSession] = None) -> str:
1307    """
1308    ecr_get_region()
1309
1310    Returns default registry region (i.e. build account) unless a session is given.
1311
1312    session: (Optional) String to pull the region from a session
1313    """
1314    if session:
1315        loggy.info(f"aws.ecr_get_region(): BEGIN (using session named: {session.name})")
1316
1317    _region = ECR_BASE_COM['region'] if session is None else get_region(session)
1318    return _region
1319
1320
1321def ecr_describe_image(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
1322    """
1323    ecr_describe_image()
1324
1325    Returns a dict with the response from aws call to describe images
1326
1327    container: String of container
1328    tag: String of container tag
1329    session: will use this session to build the client, default is _sessions
1330    region: will use a specfic region to build the client, default is _sessions.region_name
1331    """
1332    global _sessions
1333    _s = _sessions.session if session is None else session
1334    _r = _s.session.region_name if region is None else region
1335    loggy.info(f"aws.ecr_describe_image(): BEGIN (using session named: {_s.name})")
1336
1337    client = _s.session.client('ecr', region_name=_r)
1338
1339    try:
1340        response = client.describe_images(
1341            repositoryName=ecr_strip_container_name(container),
1342            imageIds=[
1343                {
1344                    'imageTag': tag
1345                }
1346            ]
1347        )
1348        # loggy.info(str(response))
1349        loggy.info("aws.ecr_describe_image(): Successfully pulled image information.")
1350    except Exception as e:
1351        loggy.info(f"aws.ecr_describe_image(): Failed to pull image information: {str(e)}")
1352        return {}
1353
1354    if not response.get('imageDetails'):
1355        loggy.info(f"aws.ecr_describe_image(): Failed to pull image information. {str(response)}")
1356        return {}
1357
1358    return response
1359
1360
1361def ecr_logout(registry_id: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> None:
1362    """
1363    ecr_logout()
1364
1365    Logout from an ECR repo
1366
1367    registry_id: Authenticate to a different registry_id (aka AWS ECR)
1368    session: will use this session to build the client, default is _sessions
1369    region: will use a specfic region to build the client, default is _sessions.region_name
1370    """
1371    global _sessions
1372
1373    loggy.info("aws.ecr_logout(): BEGIN")
1374
1375    _s = _sessions.session if session is None else session
1376    _r = _s.session.region_name if region is None else region
1377    _reg = get_aws_account_id(_s) if registry_id is None else registry_id
1378    _repo = f"{_reg}.dkr.ecr.{_r}.amazonaws.com"
1379
1380    docker.logout(repo=_repo)
1381    return
1382
1383
1384def ecr_login(registry_id: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1385    """
1386    ecr_login()
1387
1388    Authenticate Docker against an ECR repository.
1389
1390    Default: Authenticate using _sessions (current environment) to pull/push there.
1391
1392    registry_id: Authenticate to a different registry_id (aka AWS ECR)
1393    session: will use this session to build the client, default is _sessions
1394    region: will use a specfic region to build the client, default is _sessions.region_name
1395
1396    Returns: True/False
1397    """
1398    global _sessions
1399
1400    loggy.info("aws.ecr_login(): BEGIN")
1401
1402    _s = _sessions.session if session is None else session
1403    _r = _s.session.region_name if region is None else region
1404    _reg = get_aws_account_id(_s) if registry_id is None else registry_id
1405
1406    loggy.info(f"aws.ecr_login(): BEGIN (using session named: {_s.name})")
1407
1408    try:
1409        loggy.info(f"aws.ecr_login(): registry_id ({_reg}) region ({_r})")
1410        client = _s.session.client('ecr', region_name=_r)
1411        response = client.get_authorization_token(registryIds=[_reg])
1412    except Exception as e:
1413        loggy.error("Error: " + str(e))
1414        raise
1415
1416    try:
1417        auth = response["authorizationData"][0]
1418    except (IndexError, KeyError):
1419        raise RuntimeError("Unable to get authorization token from ECR!")
1420    except Exception as e:
1421        loggy.error("Error: " + str(e))
1422        raise
1423
1424    auth_token = base64.b64decode(auth["authorizationToken"]).decode()
1425    username, password = auth_token.split(":")
1426
1427    # cmd = [
1428    #     "login",
1429    #     "--username",
1430    #     username,
1431    #     "--password",
1432    #     password,
1433    #     auth["proxyEndpoint"],
1434    # ]
1435
1436    return docker.login(username=username, password=password, repo=auth["proxyEndpoint"])
1437
1438
1439def ecr_generate_build_fqcn(container: str) -> [str, str]:
1440    """
1441    ecr_generate_build_fqcn()
1442
1443    Generate a fully qualified aws docker container string for build account
1444    (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
1445
1446    container: String representing container name:tag
1447
1448    Returns: String container and String tag (None if tag doesn't exist)
1449    """
1450    global _sessions
1451    loggy.info(f"aws.ecr_generate_build_fqcn(): BEGIN (using session named: {_sessions.name})")
1452    return _sessions.ecr_generate_fqcn(container=container)
1453
1454
1455def ecr_generate_fqcn(container: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> [str, str]:
1456    """
1457    ecr_generate_fqcn()
1458
1459    Generate a fully qualified aws docker container string based on current session Credentials
1460    (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
1461
1462    container: String representing container name:tag
1463
1464    Returns: String container and String tag (None if tag doesn't exist)
1465    """
1466    global _sessions
1467
1468    if "dkr.ecr" in container:
1469        loggy.info(f"aws.ecr_generate_fqcn(): ECR URL already exists: {container}. Stripping container and creating a new ECR URL")
1470        container = ecr_strip_container_name(container=container)
1471        # return container.split(':')
1472
1473    _s = _sessions.session if session is None else session
1474    _r = ecr_get_region() if session is None else _s.session.region_name if region is None else region
1475    _reg = ecr_get_registry_id(_s)
1476
1477    loggy.info(f"aws.ecr_generate_fqcn(): BEGIN (using session named: {_s.name})")
1478
1479    loggy.info(f"aws.ecr_generate_fqcn(): Generated ECR URL: {_reg}.dkr.ecr.{_r}.amazonaws.com/{container}")
1480
1481    _ret = f"{_reg}.dkr.ecr.{_r}.amazonaws.com/{container}".split(':')
1482    if len(_ret) < 2:
1483        return _ret[0], None
1484
1485    return _ret[0], _ret[1]
1486
1487
1488def ecr_pull_from_build(container: str, tag: typing.Optional[str] = None) -> [str, str]:
1489    """
1490    ecr_pull_from_build()
1491
1492    Run a docker pull for a container:tag
1493
1494    container: String representing container name ("name:tag" is accepted)
1495    tag: String representing the tag of the container to pull (Optional)
1496
1497    NOTE: if tag is None and container does not include :tag in the String, we
1498    will append :latest to the container name
1499
1500    Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
1501    """
1502    global _sessions
1503    loggy.info(f"aws.ecr_pull_from_build(): BEGIN (using session named: {_sessions.name})")
1504    return _sessions.ecr_pull_from_build(container=container, tag=tag)
1505
1506
1507def ecr_pull(container: str, tag: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> [str, str]:
1508    """
1509    ecr_pull()
1510
1511    Run a docker pull for a container:tag
1512    Ensure you run aws.ecr_login() first
1513
1514    container: String representing container name ("name:tag" is accepted)
1515    tag: String representing the tag of the container to pull (Optional)
1516
1517    NOTE: if tag is None and container does not include :tag in the String, we
1518    will append :latest to the container name
1519
1520    Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
1521    """
1522    # global _sessions
1523    # _s = _sessions.session if session is None else session
1524    #
1525    # ecr_login(_s)
1526    # _container = container if ':' in container else f"{container}:latest" if tag is None else f"{container}:{tag}"
1527
1528    _c, _t = ecr_generate_fqcn(container=container, session=session, region=region)
1529    _t = tag if tag is not None else 'latest' if _t is None else _t
1530
1531    loggy.info(f"aws.ecr_pull(): Pulling container: {_c}:{_t}")
1532
1533    if not docker.docker("pull", f"{_c}:{_t}"):
1534        return None, None
1535
1536    return _c, _t
1537
1538
1539def ecr_check_tag_exists(container: str, tag_list: list, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> list:
1540    """
1541    ecr_check_tag_exists()
1542
1543    Check if tags exist for a container
1544
1545    container: String representing container name
1546    tag: String List of tags to check
1547    session: will use a different session to build the client, default is build session
1548    region: will use a different region to build the client, default is build region
1549
1550    Returns: String List of tags that are not found. Will return empty List on failure. (i.e. Fail Open)
1551    """
1552    global _sessions
1553    _s = _sessions.session if session is None else session
1554    _r = _s.session.region_name if region is None else region
1555
1556    loggy.info(f"aws.ecr_check_tag_exists(): BEGIN (using session named: {_s.name})")
1557
1558    client = _s.session.client('ecr', region_name=_r)
1559
1560    missing = tag_list
1561    try:
1562        response = client.describe_images(
1563            repositoryName=ecr_strip_container_name(container),
1564            filter={
1565                'tagStatus': 'TAGGED'
1566            }
1567        )
1568        loggy.info(str(response))
1569        for i in response['imageDetails']:
1570            for tag in tag_list:
1571                if tag in i['imageTags']:
1572                    loggy.info(f"aws.ecr_check_tag_exists(): Found tag {tag}. Removing from List.")
1573                    missing.remove(tag)
1574    except Exception as e:
1575        loggy.info(f"aws.ecr_check_tag_exists(): Failed to retrieve tags: {str(e)}")
1576        loggy.info("aws.ecr_check_tag_exists(): FAILING OPEN! Returning empty List.")
1577        return []
1578
1579    return missing
1580
1581
1582def ecr_push(container: str, tag: str, tag_list: typing.Optional[list] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1583    """
1584    ecr_push()
1585
1586    Push a new container to build account ECR
1587
1588    container: String representing container name
1589    tag: String with existing local tag for container name
1590    tag_list: Optional String List of additional tags to push with the container
1591    session: will use a different session to build the client, default is build session
1592
1593    Returns: True/False
1594    """
1595    global _sessions
1596    loggy.info(f"aws.ecr_push(): BEGIN (using session named: {_sessions.name})")
1597
1598    return _sessions.ecr_push(container, tag, tag_list, session, region)
1599
1600
1601def ecr_strip_container_name(container: str) -> str:
1602    """"
1603    ecr_strip_container_name()
1604
1605    Remove prefix of repo url from the repo name itself
1606
1607    container: String like "575815261832.dkr.ecr.******.amazonaws.com/mirrored/amazoncorretto"
1608
1609    Returns: String containing just container like "mirrored/amazoncorretto"
1610    """
1611    _new = container.split('/')
1612    del _new[0]
1613    return '/'.join(_new)
1614
1615
1616def ecr_scan(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1617    """
1618    ecr_scan()
1619
1620    Scan and wait for results of a container
1621
1622    container: String representing container name
1623    tag: String tags to scan
1624    session: will use a different to build the client, default is build session
1625
1626    Returns: True/False
1627    """
1628    global _sessions
1629    _s = _sessions.session if session is None else session
1630    _r = _s.session.region_name if region is None else region
1631
1632    loggy.info(f"aws.ecr_scan(): BEGIN (using session named: {_s.name})")
1633
1634    client = _s.session.client('ecr', region_name=_r)
1635    res = "IN_PROGRESS"
1636    _wait = 300
1637    try:
1638        while 'IN_PROGRESS' in res and _wait > 0:
1639            loggy.info(f"aws.ecr_scan(): Waiting up to 300 seconds for the ECR Scan of {container} with tag {tag} to complete.")
1640            response = client.describe_image_scan_findings(
1641                repositoryName=ecr_strip_container_name(container),
1642                imageId={
1643                    'imageTag': tag
1644                }
1645            )
1646            time.sleep(5)
1647            _wait -= 5
1648            res = response['imageScanStatus']['status']
1649
1650        loggy.info(f"aws.ecr_scan(): Scan completed with: {res}")
1651    except Exception as e:
1652        loggy.info(f"aws.ecr_scan(): Failed to get the scan status: {str(e)}")
1653        return False
1654
1655    """
1656    TODO: Check the scan for vulns
1657    """
1658
1659    return True
1660
1661
1662"""
1663SSM Utils
1664"""
1665
1666
1667def ssm_get_parameter_from_build(name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
1668    """
1669    ssm_get_parameter_from_build()
1670
1671    Get an SSM Parameter Value from the build account.
1672
1673    name: String containing param friendly name or arn. Will convert arn into friendly name before use.
1674    session: aws.Sessions() will use a different session to build the client, default is _sessions
1675    region: String defaults to AWS_DEFAULT_REGION or us-east-1
1676
1677    Returns String containing param value
1678    """
1679    global _sessions
1680    loggy.info(f"aws.ssm_get_parameter_from_build(): BEGIN (using session named: {_sessions.name})")
1681    return _sessions.ssm_get_parameter(name, session, region)
1682
1683
1684# def ssm_get_parameter(name, session=None, region=None, build=None):
1685def ssm_get_parameter(name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
1686    """
1687    ssm_get_parameter()
1688
1689    Get an SSM Parameter Value
1690
1691    name: String containing param friendly name or arn. Will convert arn into friendly name before use.
1692    region: String defaults to AWS_DEFAULT_REGION or us-east-1
1693    session: aws.Sessions() will use a different session to build the client, default is _sessions
1694
1695    Returns String containing param value
1696    """
1697    global _sessions
1698    _s = _sessions.session if session is None else session
1699    _r = _s.session.region_name if region is None else region
1700
1701    loggy.info(f"aws.ssm_get_parameter(): BEGIN (using session named: {_s.name})")
1702
1703    """
1704    This function only takes a name not an arn
1705    """
1706    name = name if ':parameter' not in name else name.split(':parameter')[1]
1707
1708    try:
1709        client = _s.session.client(service_name='ssm', region_name=_r)
1710        response = client.get_parameter(Name=name, WithDecryption=True)
1711
1712        return response['Parameter']['Value']
1713    except Exception as e:
1714        loggy.error("Error: " + str(e))
1715
1716    return None
1717
1718
1719def ssm_put_parameter(name: str,
1720                      value: str,
1721                      type: typing.Optional[str] = None,
1722                      session: typing.Optional[AwsSession] = None,
1723                      region: typing.Optional[str] = None,
1724                      KeyId: typing.Optional[str] = None) -> str:
1725    """
1726    ssm_put_parameter()
1727
1728    Create/Update an SSM Parameter
1729
1730    name and value are required
1731    type defaults to String
1732    region defaults to AWS_DEFAULT_REGION or us-east-1
1733    session will use a different session to build the client, default is _sessions
1734    KeyId is a string containing the KeyId for the encryption key to use if not default
1735
1736    Returns: String of new ssm param version
1737    """
1738    global _sessions
1739    _s = _sessions.session if session is None else session
1740    _r = _s.session.region_name if region is None else region
1741
1742    loggy.info(f"aws.ssm_put_parameter(): BEGIN (using session named: {_s.name})")
1743
1744    if not type:
1745        type = "String"
1746
1747    """
1748    This function only takes a name not an arn
1749    """
1750    name = name if ':parameter' not in name else name.split(':parameter')[1]
1751
1752    try:
1753        client = _s.session.client(service_name='ssm', region_name=_r)
1754        if KeyId:
1755            response = client.put_parameter(Name=name, Value=value, Type=type, Overwrite=True, KeyId=KeyId)
1756        else:
1757            response = client.put_parameter(Name=name, Value=value, Type=type, Overwrite=True)
1758
1759        return str(response['Version'])
1760    except Exception as e:
1761        loggy.error("Error: " + str(e))
1762
1763    return None
1764
1765
1766"""
1767SecretsManger Utils
1768"""
1769
1770
1771def secrets_get_secret_string(name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
1772    """
1773    secrets_get_secret_string()
1774
1775    Retrieve sectret string value using ENV Variables as Credentials.
1776    This will retrieve the AWSCURRENT version.
1777
1778    name is required
1779    region defaults to AWS_DEFAULT_REGION or us-east-1
1780    session will use a different session to build the client, default is _sessions
1781
1782    Returns: dict containing secret string
1783    """
1784    global _sessions
1785    _s = _sessions.session if session is None else session
1786    _r = _s.session.region_name if region is None else region
1787
1788    loggy.info(f"aws.secrets_get_secret_string(): BEGIN (using session named: {_s.name})")
1789    loggy.info(f"aws.secrets_get_secret_string(): region name {_r}")
1790    try:
1791        client = _s.session.client(service_name='secretsmanager', region_name=_r)
1792
1793        response = client.get_secret_value(SecretId=name)
1794
1795        return json.loads(response['SecretString'])
1796    except Exception as e:
1797        loggy.error("Error: " + str(e))
1798        raise
1799
1800    return {}
class AwsCreds:
56class AwsCreds():
57    access_key = None
58    secret_access_key = None
59    region = None
AwsCreds()
access_key = None
secret_access_key = None
region = None
class AwsSession:
62class AwsSession():
63    session = None
64    creds = None
65    name = None
66
67    def __init__(self, name):
68        self.creds = AwsCreds()
69        self.name = name
70        self.session = None
AwsSession(name)
67    def __init__(self, name):
68        self.creds = AwsCreds()
69        self.name = name
70        self.session = None
session = None
creds = None
name = None
class AwsSessions:
 73class AwsSessions():
 74    __session = AwsSession("default")
 75    __session_task = AwsSession("task")
 76    __session_build_com = AwsSession("build_com")
 77    __session_build_gov = AwsSession("build_gov")
 78    __session_ecr_build_com = AwsSession("ecr_build_com")
 79    __session_ecr_build_gov = AwsSession("ecr_build_gov")
 80    __region = None
 81
 82    def __init__(self):
 83        self.__init_session()
 84        self.name = self.__session.name
 85        self.region = self.__session.creds.region
 86        self.session = self.__session
 87
 88    def get_session(self) -> boto3.Session:
 89        """
 90        get_session()
 91
 92        Get a reusable boto3.Session() to run your own boto3 commands into aws.
 93        """
 94        return self.__session.session
 95
 96    def get_creds(self) -> AwsCreds:
 97        """
 98        get_creds()
 99
100        Get a reusable AwsCreds() to run your own CLI commands into aws.
101        """
102        return self.__session.creds
103
104    def __get_session_build_com(self) -> AwsSession:
105        if self.__session_build_com.session is None:
106            loggy.info("aws.__get_session_build_com(): session is None. Attempting to make one.")
107            self.__init_internal_build_sessions()
108        return self.__session_build_com
109
110    def __get_session_build_gov(self) -> AwsSession:
111        if self.__session_build_gov.session is None:
112            loggy.info("aws.__get_session_build_gov(): session is None. Attempting to make one.")
113            self.__init_internal_build_sessions()
114        return self.__session_build_gov
115
116    def __get_session_ecr_build_com(self) -> AwsSession:
117        if self.__session_ecr_build_com.session is None:
118            loggy.info("aws.__get_session_ecr_build_com(): session is None. Attempting to make one.")
119            self.__init_internal_ecr_session()
120        return self.__session_ecr_build_com
121
122    def __get_creds_ecr_build_com(self) -> AwsCreds:
123        if self.__session_ecr_build_com.creds is None:
124            loggy.info("aws.__get_session_ecr_build_com(): creds is None. Attempting to make one.")
125            self.__init_internal_ecr_session()
126        return self.__session_ecr_build_com.creds
127
128    def __get_session_ecr_build_gov(self) -> AwsSession:
129        if self.__session_ecr_build_gov.session is None:
130            loggy.info("aws.__get_session_ecr_build_gov(): session is None. Attempting to make one.")
131            self.__init_internal_ecr_session()
132        return self.__session_ecr_build_gov
133
134    def __get_creds_ecr_build_gov(self) -> AwsCreds:
135        if self.__session_ecr_build_gov.creds is None:
136            loggy.info("aws.__get_session_ecr_build_gov(): creds is None. Attempting to make one.")
137            self.__init_internal_ecr_session()
138        return self.__session_ecr_build_gov.creds
139
140    def __get_task_region(self) -> str:
141        """
142        __get_task_region()
143
144        Get the region of the task container from metadata
145        """
146        loggy.info("aws.AwsSessions().__get_task_region: Getting region from task metadata")
147        _r = "us-east-1"
148        if os.environ.get('ECS_CONTAINER_METADATA_URI'):
149            output = _run("curl -s ${ECS_CONTAINER_METADATA_URI}/task", check=True, shell=True)
150            task_arn = json.loads(output.stdout)
151            _r = task_arn['TaskARN'].split(':')[3]
152
153        return _r
154
155    def __init_session(self) -> AwsSession:
156        """
157        __init_session()
158
159        This function initializes a boto3 AWS session for use in boto3 clients, using
160        the pipeline environment credentials.
161
162        Returns a reusable AwsSession() object
163        """
164        if self.__session and self.__session.session:
165            return self.__session
166
167        self.__session.creds.region = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
168
169        if os.environ.get('AWS_ACCESS_KEY_ID') and os.environ.get('AWS_SECRET_ACCESS_KEY'):
170            # loggy.info("aws.init_session(): Generating boto3 default session")
171            self.__session.creds.access_key = os.environ.get('AWS_ACCESS_KEY_ID')
172            self.__session.creds.secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
173            self.__session.creds.region = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
174
175            self.__session.session = boto3.Session(
176                aws_access_key_id=self.__session.creds.access_key,
177                aws_secret_access_key=self.__session.creds.secret_access_key,
178                region_name=self.__session.creds.region)
179
180        else:
181            # loggy.info("aws.init_session(): boto3 default session set to None.")
182            self.__session = AwsSession("default")
183            return self.__session
184
185        _prime_the_session_pump = self.__session.session.client('sts').get_caller_identity()
186
187        # init = True
188        _prime_the_session_pump = _prime_the_session_pump
189        # self.__init_internal_sessions()
190        # self.__init_internal_ecr_session()
191        return self.__session
192
193    def __init_internal_ecr_session(self):
194        """
195        __init_internal_ecr_session()
196
197        This function creates reusable sessions for pushing to ecr based on the environment.
198        This is meant to be an internal only function and does not return anything.
199        """
200
201        if self.__session_ecr_build_com.session or self.__session_ecr_build_gov.session:
202            return
203
204        secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/ecr-push", session=self.__get_session_build_com())
205        # loggy.info("aws._init_internal_sessions(): Generating boto3 build com session")
206        self.__session_ecr_build_com.creds.access_key = secret_string['access_key']
207        self.__session_ecr_build_com.creds.secret_access_key = secret_string['secret_access_key']
208        self.__session_ecr_build_com.creds.region = secret_string['region']
209
210        self.__session_ecr_build_com.session = boto3.Session(
211            aws_access_key_id=self.__session_ecr_build_com.creds.access_key,
212            aws_secret_access_key=self.__session_ecr_build_com.creds.secret_access_key,
213            region_name=self.__session_ecr_build_com.creds.region
214        )
215        _prime_the_session_pump = self.__session_ecr_build_com.session.client('sts').get_caller_identity()
216
217        if 'gov' in os.environ.get('AWS_DEFAULT_REGION'):
218            secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/ecr-push", session=self.__get_session_build_gov())
219            # loggy.info("aws._init_internal_sessions(): Generating boto3 build com session")
220            self.__session_ecr_build_gov.creds.access_key = secret_string['access_key']
221            self.__session_ecr_build_gov.creds.secret_access_key = secret_string['secret_access_key']
222            self.__session_ecr_build_gov.creds.region = secret_string['region']
223
224            self.__session_ecr_build_gov.session = boto3.Session(
225                aws_access_key_id=self.__session_ecr_build_gov.creds.access_key,
226                aws_secret_access_key=self.__session_ecr_build_gov.creds.secret_access_key,
227                region_name=self.__session_ecr_build_gov.creds.region
228            )
229            _prime_the_session_pump = self.__session_ecr_build_gov.session.client('sts').get_caller_identity()
230
231        _prime_the_session_pump = _prime_the_session_pump
232        return
233
234    def __init_internal_build_sessions(self):
235        """
236        __init_internal_build_sessions()
237
238        This function creates reusable sessions for _task, _build_com and/or _build_gov.
239        This is meant to be an internal only function and does not return anything.
240        """
241        if self.__session_task.session or self.__session_build_com.session or self.__session_build_gov.session:
242            return
243
244        with self.__unset_aws_envs():
245            """
246            Generate a session using the container task IAM role
247            """
248            loggy.info("aws.__init_internal_build_sessions(): Generating boto3 task session")
249            self.__session_task.session = boto3.Session(region_name=self.__get_task_region())
250            _prime_the_session_pump = self.__session_task.session.client('sts').get_caller_identity()
251            # loggy.info("aws._init_internal_sessions(): boto3 task session: " + str(_prime_the_session_pump))
252            self.__session_task.creds = None
253            secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/gocd", session=self.__session_task)
254
255        # loggy.info("aws._init_internal_sessions(): Generating boto3 build com session")
256
257        self.__session_build_com.creds.access_key = secret_string['access_key']
258        self.__session_build_com.creds.secret_access_key = secret_string['secret_access_key']
259        self.__session_build_com.creds.region = secret_string['region']
260
261        """
262        Generate a session for the build commercial account
263        """
264        self.__session_build_com.session = boto3.Session(
265            aws_access_key_id=self.__session_build_com.creds.access_key,
266            aws_secret_access_key=self.__session_build_com.creds.secret_access_key,
267            region_name=self.__session_build_com.creds.region
268        )
269        _prime_the_session_pump = self.__session_build_com.session.client('sts').get_caller_identity()
270
271        if 'gov' in os.environ.get('AWS_DEFAULT_REGION', 'none'):
272            # loggy.info("aws._init_internal_sessions(): Generating boto3 build gov session")
273            secret_string = secrets_get_secret_string(name="/rekor/cloudops/cicd/svc_accounts/gocd_gov", session=self.__session_task)
274
275            self.__session_build_gov.creds.access_key = secret_string['access_key']
276            self.__session_build_gov.creds.secret_access_key = secret_string['secret_access_key']
277            self.__session_build_gov.creds.region = secret_string['region']
278
279            """
280            Generate a session for the build gov account
281            """
282            self.__session_build_gov.session = boto3.Session(
283                aws_access_key_id=self.__session_build_gov.creds.access_key,
284                aws_secret_access_key=self.__session_build_gov.creds.secret_access_key,
285                region_name=self.__session_build_gov.creds.region
286            )
287            _prime_the_session_pump = self.__session_build_gov.session.client('sts').get_caller_identity()
288        else:
289            # loggy.info("aws._init_internal_sessions(): boto3 build gov session set to None.")
290            self.__session_build_gov.session = None
291            self.__session_build_gov.creds = None
292
293        _prime_the_session_pump = _prime_the_session_pump
294        # loggy.info("aws._init_internal_sessions(): Primed the session pumps: " + str(_prime_the_session_pump))
295
296        return
297
298    @contextlib.contextmanager
299    def __unset_aws_envs(self, **environ):
300        """
301        __unset_aws_envs()
302
303        Temporarily unset aws environment variables.
304
305        >>> with _unset_aws_envs():
306        ...   "AWS_ACCESS_KEY_ID" in os.environ
307        False
308        ...   "AWS_SECRET_ACCESS_KEY" in os.environ
309        False
310        ...   "AWS_DEFAULT_REGION" in os.environ
311        False
312
313        >>> "AWS_ACCESS_KEY_ID" in os.environ
314        AFXYZXYZXYZ
315
316        :type environ: dict[str, unicode]
317        """
318        old_environ = dict(os.environ)
319        if os.environ.get("AWS_ACCESS_KEY_ID"):
320            # loggy.info("aws._unset_aws_envs(): Temp Deleting AWS_ACCESS_KEY_ID from ENV")
321            del os.environ["AWS_ACCESS_KEY_ID"]
322        if os.environ.get("AWS_SECRET_ACCESS_KEY"):
323            # loggy.info("aws._unset_aws_envs(): Temp Deleting AWS_SECRET_ACCESS_KEY from ENV")
324            del os.environ["AWS_SECRET_ACCESS_KEY"]
325        if os.environ.get("AWS_DEFAULT_REGION"):
326            # loggy.info("aws._unset_aws_envs(): Temp Deleting AWS_SECRET_ACCESS_KEY from ENV")
327            del os.environ["AWS_DEFAULT_REGION"]
328
329        try:
330            # loggy.info("aws._unset_aws_envs(): Yielding back temp modified environment")
331            yield
332        finally:
333            # loggy.info("aws._unset_aws_envs(): Clearing temp environ changes and returning original environ")
334            os.environ.clear()
335            os.environ.update(old_environ)
336
337    """
338    Interal SSM functions
339    """
340
341    def ssm_get_parameter(self, name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None):
342        """
343        ssm_get_parameter()
344
345        Get an SSM Parameter Value from the build com or gov account
346
347        name: String containing param friendly name or arn. Will convert arn into friendly name before use.
348        region: String defaults to AWS_DEFAULT_REGION or us-east-1
349        session: aws.Sessions() will use a different session to build the client, default is _session
350        """
351
352        _s = self.__get_session_build_com() if session is None else session
353        _r = _s.region_name if region is None else region
354        if 'gov' in _r:
355            _s = self.__get_session_build_gov() if session is None else session
356
357        loggy.info(f"aws.AwsSessions().ssm_get_parameter(): BEGIN (using session named: {_s.name})")
358
359        return ssm_get_parameter(name=name, session=_s, region=_r)
360
361    """
362    Internal ECR functions
363    """
364
365    def ecr_generate_fqcn(self, container: str) -> [str, str]:
366        """
367        ecr_generate_fqcn()
368
369        Generate a fully qualified aws docker container string for the build account
370        (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
371
372        container: String representing container name:tag
373
374        Returns: String container and String tag (None if tag doesn't exist)
375        """
376        _s = self.__get_session_ecr_build_com()
377        _r = ecr_get_region()
378
379        loggy.info(f"aws.AwsSessions().ecr_generate_fqcn(): BEGIN (using session named: {_s.name})")
380        return ecr_generate_fqcn(container=container, session=_s, region=_r)
381
382    def ecr_tag_to_build(self, container: str, tag_list: list) -> bool:
383        """
384        ecr_tag_to_build()
385
386        Add a Tag to an existing remote ecr container.
387        Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
388
389        container: String containing existing remote container with tag "container:tag"
390        tag_list: String List containing new tags to add to the local container
391        """
392        _s = self.__get_session_ecr_build_com()
393        _r = ecr_get_region()
394
395        loggy.info(f"aws.AwsSessions().ecr_tag_to_build(): BEGIN (using session named: {_s.name})")
396        for tag in tag_list:
397            if not ecr_tag(container=container, tag=tag, session=_s, region=_r):
398                loggy.info(f"aws.AwsSessions().ecr_tag_to_build(): Failed to put tag {tag}")
399                return False
400
401        return True
402
403    def ecr_promote_to_gov(self, container: str, tag: str, push_all_tags: typing.Optional[bool] = True) -> bool:
404        """
405        ecr_promote_to_gov()
406
407        Pull container from ECR Commercial and Push it into ECR GovCloud.
408        NOTE: This will ONLY work from a gov pipeline!
409
410        container: String containing existing remote container with tag "container"
411        tag: String containing existing tag for container
412        push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair
413
414        Returns: True/False
415        """
416        if 'gov' not in os.environ.get('AWS_DEFAULT_REGION'):
417            loggy.info("aws.AwsSession().ecr_promote_to_gov(): THIS IS NOT A GOV PIPELINE! RETURNING FALSE.")
418            return False
419
420        _s_from = self.__get_session_ecr_build_com()
421        _r_from = ecr_get_region()
422
423        _s_to = self.__get_session_ecr_build_gov()
424        _r_to = ecr_get_region(_s_to)
425        _reg_id_to = ecr_get_registry_id(_s_to)
426
427        loggy.info(f"aws.AwsSessions().ecr_promote_to_gov(): BEGIN (using sessions named: {_s_from.name} and {_s_to.name})")
428
429        _c, _t = self.ecr_pull_from_build(container, tag)
430
431        # Grab all of the tags for this image
432        # aws --profile build-com ecr describe-images --repository-name 'warp/api' --image-ids imageTag=prod_rc | jq .imageDetails[0].imageTags
433        _details = ecr_describe_image(container=_c, tag=_t, session=_s_from, region=_r_from)
434        _tag_list = _details['imageDetails'][0]['imageTags']
435        if not ecr_login(registry_id=_reg_id_to, session=_s_to, region=_r_to):
436            loggy.info("aws.AwsSessions().ecr_promote_to_gov(): Failed to log into the gov build ECR")
437            return False
438
439        return self.ecr_push_and_scan(container=_c, tag=_t, tag_list=_tag_list, session=_s_to)
440
441    def ecr_pull_from_build(self, container: str, tag: str) -> [str, str]:
442        """
443        ecr_pull_from_build()
444
445        Pull an ECR container from the build account.
446
447        container: String containing existing remote container
448        tag: String containing tag of remote container
449
450        Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
451        """
452        _s = self.__get_session_ecr_build_com()
453        _ri = ecr_get_registry_id()
454        _r = ecr_get_region()
455
456        loggy.info(f"aws.AwsSessions().ecr_pull_from_build(): BEGIN (using session named: {_s.name})")
457
458        if not ecr_login(registry_id=_ri, session=_s, region=_r):
459            return None, None
460
461        return ecr_pull(container=container, tag=tag, session=_s, region=_r)
462
463    def ecr_push_and_scan(self, container: str, tag: str, tag_list: typing.Optional[list] = None, session: typing.Optional[AwsSession] = None) -> bool:
464        """
465        ecr_push_and_scan()
466
467        Push a new container to build account ECR.
468        This is only allowed from develop or hotfix branches.
469
470        container: String representing local container name
471        tag: String with existing local tag for container name
472        tag_list: Optional String List of additional tags to push with the container
473        session: will use a different to build the client, default is build session
474
475        Returns: True/False
476        """
477        _b = get_source_branch()
478
479        if 'develop' not in _b and 'hotfix' not in _b and 'gov' not in os.environ.get('AWS_DEFAULT_REGION'):
480            # check if image exists already. If it doesn't, we cannot make a new one. If it does we can promote it.
481            loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Cannot push from branch ({_b}). Only develop and hotfix allowed.")
482            sys.exit(f"aws.AwsSessions().ecr_push_and_scan(): Cannot push from branch ({_b}). Only develop and hotfix allowed.")
483
484        _s = self.__get_session_ecr_build_com() if session is None else session
485        _ri = ecr_get_registry_id(session=_s)
486        _r = ecr_get_region(session=_s)
487
488        loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): BEGIN (using session named: {_s.name})")
489        try:
490            ecr_login(registry_id=_ri, session=_s, region=_r)
491
492            #
493            # We only want to scan the first tag we push.
494            #
495            _container, _tag = ecr_generate_fqcn(container=f"{container}:{tag}", session=_s, region=_r)
496
497            loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Pushing local container ({container}:{tag}) to remote ({_container}:{_tag})")
498
499            _temp_tag = f"{get_version()}.SCAN"
500
501            loggy.info("aws.AwsSessions().ecr_push_and_scan(): TAG FOR SCAN")
502            loggy.info(f"docker tag {container}:{_tag} {_container}:{_temp_tag}")
503            if not docker.docker("tag", f"{container}:{_tag}", f"{_container}:{_temp_tag}"):
504                return False
505
506            loggy.info("aws.AwsSessions().ecr_push_and_scan(): PUSH TAG FOR SCAN")
507            if not docker.docker("push", f"{_container}:{_temp_tag}"):
508                return False
509
510            #
511            # ECR Scan
512            #
513            if ecr_scan(container=container, tag=_temp_tag, session=_s, region=_r):
514                #
515                # Push the original local tag into ECR
516                #
517                loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Post Scan - Pushing original tag ({tag}) using manifest")
518                if not ecr_tag(container=f"{_container}:{_temp_tag}", tag=tag, session=_s, region=_r):
519                    return False
520                # if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:{tag}"):
521                #     return False
522                # if not docker.docker("push", f"{_container}:{tag}"):
523                #     return False
524
525                if tag_list:
526                    for t in tag_list:
527                        loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Post Scan - Pushing tag ({t}) using manifest")
528                        if not ecr_tag(container=f"{_container}:{_temp_tag}", tag=t, session=_s, region=_r):
529                            return False
530                        # if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:{t}"):
531                        #     return False
532                        # if not docker.docker("push", f"{_container}:{t}"):
533                        #     return False
534
535                #
536                # Push initial set of tags for this container so we have them ready when we
537                # build out any of our standard named environments.
538                #
539                for t in ecr_check_tag_exists(container=_container, tag_list=['develop', 'qa', 'stage', 'hotfix', 'demo', 'prod'], session=_s, region=_r):
540                    loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): CDK tag for ENV {t} does not exist. Pushing tag so CDK can deploy/create successfully.")
541                    _build_stub_container, _build_stub_tag = ecr_pull_from_build(container='gocd/stub', tag='stub')
542                    if not ecr_login(registry_id=_ri, session=_s, region=_r):
543                        return False
544                    _stub_container, _stub_tag = ecr_generate_fqcn(container="gocd/stub:stub", session=_s, region=_r)
545                    if not docker.docker("tag", f"{_build_stub_container}:{_build_stub_tag}", f"{_container}:{t}"):
546                        return False
547                    if not docker.docker("push", f"{_container}:{t}"):
548                        return False
549
550                # else:
551                #     loggy.info("aws.AwsSessions().ecr_push_and_scan(): No more tags to push")
552            else:
553                # Remove the original tag and add a FAILED tag to this container
554                if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:FAILED_SCAN"):
555                    return False
556                if not docker.docker("push", f"{_container}:FAILED_SCAN"):
557                    return False
558
559                try:
560                    client = _s.session.client('ecr', region_name=_r)
561                    response = client.batch_delete_image(
562                        repositoryName=ecr_strip_container_name(_container),
563                        imageIds=[{
564                            'imageTag': _temp_tag
565                        }]
566                    )
567                except Exception as e:
568                    loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Failed to delete the temp tag {_temp_tag} from container after scan failed. {str(e)}")
569                    fail_code = response['failures'][0]['failureCode']
570                    fail_reason = response['failures'][0]['failureReason']
571                    loggy.info(f"Failure Code ({fail_code}). Failure Reason ({fail_reason})")
572
573                loggy.info("aws.AwsSessions().ecr_push_and_scan(): Container Scan failed. Check out pipeline results tab or ECR for Vulns.")
574
575        except Exception as e:
576            loggy.error("aws.AwsSessions().ecr_push_and_scan(): Logging out of ECR repo.")
577            ecr_logout(registry_id=_ri, session=_s, region=_r)
578            sys.exit(f"aws.AwsSessions().ecr_push_and_scan(): Error: {str(e)}")
579
580        loggy.info("aws.AwsSessions().ecr_push_and_scan(): Logging out of ECR repo.")
581        ecr_logout(registry_id=_ri, session=_s, region=_r)
582
583        return True
AwsSessions()
82    def __init__(self):
83        self.__init_session()
84        self.name = self.__session.name
85        self.region = self.__session.creds.region
86        self.session = self.__session
def get_session(self) -> boto3.session.Session:
88    def get_session(self) -> boto3.Session:
89        """
90        get_session()
91
92        Get a reusable boto3.Session() to run your own boto3 commands into aws.
93        """
94        return self.__session.session

get_session()

Get a reusable boto3.Session() to run your own boto3 commands into aws.

def get_creds(self) -> utils.aws.AwsCreds:
 96    def get_creds(self) -> AwsCreds:
 97        """
 98        get_creds()
 99
100        Get a reusable AwsCreds() to run your own CLI commands into aws.
101        """
102        return self.__session.creds

get_creds()

Get a reusable AwsCreds() to run your own CLI commands into aws.

def ssm_get_parameter( self, name: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None)
341    def ssm_get_parameter(self, name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None):
342        """
343        ssm_get_parameter()
344
345        Get an SSM Parameter Value from the build com or gov account
346
347        name: String containing param friendly name or arn. Will convert arn into friendly name before use.
348        region: String defaults to AWS_DEFAULT_REGION or us-east-1
349        session: aws.Sessions() will use a different session to build the client, default is _session
350        """
351
352        _s = self.__get_session_build_com() if session is None else session
353        _r = _s.region_name if region is None else region
354        if 'gov' in _r:
355            _s = self.__get_session_build_gov() if session is None else session
356
357        loggy.info(f"aws.AwsSessions().ssm_get_parameter(): BEGIN (using session named: {_s.name})")
358
359        return ssm_get_parameter(name=name, session=_s, region=_r)

ssm_get_parameter()

Get an SSM Parameter Value from the build com or gov account

name: String containing param friendly name or arn. Will convert arn into friendly name before use. region: String defaults to AWS_DEFAULT_REGION or us-east-1 session: aws.Sessions() will use a different session to build the client, default is _session

def ecr_generate_fqcn(self, container: str) -> [, ]:
365    def ecr_generate_fqcn(self, container: str) -> [str, str]:
366        """
367        ecr_generate_fqcn()
368
369        Generate a fully qualified aws docker container string for the build account
370        (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
371
372        container: String representing container name:tag
373
374        Returns: String container and String tag (None if tag doesn't exist)
375        """
376        _s = self.__get_session_ecr_build_com()
377        _r = ecr_get_region()
378
379        loggy.info(f"aws.AwsSessions().ecr_generate_fqcn(): BEGIN (using session named: {_s.name})")
380        return ecr_generate_fqcn(container=container, session=_s, region=_r)

ecr_generate_fqcn()

Generate a fully qualified aws docker container string for the build account (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)

container: String representing container name:tag

Returns: String container and String tag (None if tag doesn't exist)

def ecr_tag_to_build(self, container: str, tag_list: list) -> bool:
382    def ecr_tag_to_build(self, container: str, tag_list: list) -> bool:
383        """
384        ecr_tag_to_build()
385
386        Add a Tag to an existing remote ecr container.
387        Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
388
389        container: String containing existing remote container with tag "container:tag"
390        tag_list: String List containing new tags to add to the local container
391        """
392        _s = self.__get_session_ecr_build_com()
393        _r = ecr_get_region()
394
395        loggy.info(f"aws.AwsSessions().ecr_tag_to_build(): BEGIN (using session named: {_s.name})")
396        for tag in tag_list:
397            if not ecr_tag(container=container, tag=tag, session=_s, region=_r):
398                loggy.info(f"aws.AwsSessions().ecr_tag_to_build(): Failed to put tag {tag}")
399                return False
400
401        return True

ecr_tag_to_build()

Add a Tag to an existing remote ecr container. Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")

container: String containing existing remote container with tag "container:tag" tag_list: String List containing new tags to add to the local container

def ecr_promote_to_gov( self, container: str, tag: str, push_all_tags: Union[bool, NoneType] = True) -> bool:
403    def ecr_promote_to_gov(self, container: str, tag: str, push_all_tags: typing.Optional[bool] = True) -> bool:
404        """
405        ecr_promote_to_gov()
406
407        Pull container from ECR Commercial and Push it into ECR GovCloud.
408        NOTE: This will ONLY work from a gov pipeline!
409
410        container: String containing existing remote container with tag "container"
411        tag: String containing existing tag for container
412        push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair
413
414        Returns: True/False
415        """
416        if 'gov' not in os.environ.get('AWS_DEFAULT_REGION'):
417            loggy.info("aws.AwsSession().ecr_promote_to_gov(): THIS IS NOT A GOV PIPELINE! RETURNING FALSE.")
418            return False
419
420        _s_from = self.__get_session_ecr_build_com()
421        _r_from = ecr_get_region()
422
423        _s_to = self.__get_session_ecr_build_gov()
424        _r_to = ecr_get_region(_s_to)
425        _reg_id_to = ecr_get_registry_id(_s_to)
426
427        loggy.info(f"aws.AwsSessions().ecr_promote_to_gov(): BEGIN (using sessions named: {_s_from.name} and {_s_to.name})")
428
429        _c, _t = self.ecr_pull_from_build(container, tag)
430
431        # Grab all of the tags for this image
432        # aws --profile build-com ecr describe-images --repository-name 'warp/api' --image-ids imageTag=prod_rc | jq .imageDetails[0].imageTags
433        _details = ecr_describe_image(container=_c, tag=_t, session=_s_from, region=_r_from)
434        _tag_list = _details['imageDetails'][0]['imageTags']
435        if not ecr_login(registry_id=_reg_id_to, session=_s_to, region=_r_to):
436            loggy.info("aws.AwsSessions().ecr_promote_to_gov(): Failed to log into the gov build ECR")
437            return False
438
439        return self.ecr_push_and_scan(container=_c, tag=_t, tag_list=_tag_list, session=_s_to)

ecr_promote_to_gov()

Pull container from ECR Commercial and Push it into ECR GovCloud. NOTE: This will ONLY work from a gov pipeline!

container: String containing existing remote container with tag "container" tag: String containing existing tag for container push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair

Returns: True/False

def ecr_pull_from_build(self, container: str, tag: str) -> [, ]:
441    def ecr_pull_from_build(self, container: str, tag: str) -> [str, str]:
442        """
443        ecr_pull_from_build()
444
445        Pull an ECR container from the build account.
446
447        container: String containing existing remote container
448        tag: String containing tag of remote container
449
450        Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
451        """
452        _s = self.__get_session_ecr_build_com()
453        _ri = ecr_get_registry_id()
454        _r = ecr_get_region()
455
456        loggy.info(f"aws.AwsSessions().ecr_pull_from_build(): BEGIN (using session named: {_s.name})")
457
458        if not ecr_login(registry_id=_ri, session=_s, region=_r):
459            return None, None
460
461        return ecr_pull(container=container, tag=tag, session=_s, region=_r)

ecr_pull_from_build()

Pull an ECR container from the build account.

container: String containing existing remote container tag: String containing tag of remote container

Returns: Tuple [str, str] First str is full ECR name of container, second str is tag

def ecr_push_and_scan( self, container: str, tag: str, tag_list: Union[list, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None) -> bool:
463    def ecr_push_and_scan(self, container: str, tag: str, tag_list: typing.Optional[list] = None, session: typing.Optional[AwsSession] = None) -> bool:
464        """
465        ecr_push_and_scan()
466
467        Push a new container to build account ECR.
468        This is only allowed from develop or hotfix branches.
469
470        container: String representing local container name
471        tag: String with existing local tag for container name
472        tag_list: Optional String List of additional tags to push with the container
473        session: will use a different to build the client, default is build session
474
475        Returns: True/False
476        """
477        _b = get_source_branch()
478
479        if 'develop' not in _b and 'hotfix' not in _b and 'gov' not in os.environ.get('AWS_DEFAULT_REGION'):
480            # check if image exists already. If it doesn't, we cannot make a new one. If it does we can promote it.
481            loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Cannot push from branch ({_b}). Only develop and hotfix allowed.")
482            sys.exit(f"aws.AwsSessions().ecr_push_and_scan(): Cannot push from branch ({_b}). Only develop and hotfix allowed.")
483
484        _s = self.__get_session_ecr_build_com() if session is None else session
485        _ri = ecr_get_registry_id(session=_s)
486        _r = ecr_get_region(session=_s)
487
488        loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): BEGIN (using session named: {_s.name})")
489        try:
490            ecr_login(registry_id=_ri, session=_s, region=_r)
491
492            #
493            # We only want to scan the first tag we push.
494            #
495            _container, _tag = ecr_generate_fqcn(container=f"{container}:{tag}", session=_s, region=_r)
496
497            loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Pushing local container ({container}:{tag}) to remote ({_container}:{_tag})")
498
499            _temp_tag = f"{get_version()}.SCAN"
500
501            loggy.info("aws.AwsSessions().ecr_push_and_scan(): TAG FOR SCAN")
502            loggy.info(f"docker tag {container}:{_tag} {_container}:{_temp_tag}")
503            if not docker.docker("tag", f"{container}:{_tag}", f"{_container}:{_temp_tag}"):
504                return False
505
506            loggy.info("aws.AwsSessions().ecr_push_and_scan(): PUSH TAG FOR SCAN")
507            if not docker.docker("push", f"{_container}:{_temp_tag}"):
508                return False
509
510            #
511            # ECR Scan
512            #
513            if ecr_scan(container=container, tag=_temp_tag, session=_s, region=_r):
514                #
515                # Push the original local tag into ECR
516                #
517                loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Post Scan - Pushing original tag ({tag}) using manifest")
518                if not ecr_tag(container=f"{_container}:{_temp_tag}", tag=tag, session=_s, region=_r):
519                    return False
520                # if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:{tag}"):
521                #     return False
522                # if not docker.docker("push", f"{_container}:{tag}"):
523                #     return False
524
525                if tag_list:
526                    for t in tag_list:
527                        loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Post Scan - Pushing tag ({t}) using manifest")
528                        if not ecr_tag(container=f"{_container}:{_temp_tag}", tag=t, session=_s, region=_r):
529                            return False
530                        # if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:{t}"):
531                        #     return False
532                        # if not docker.docker("push", f"{_container}:{t}"):
533                        #     return False
534
535                #
536                # Push initial set of tags for this container so we have them ready when we
537                # build out any of our standard named environments.
538                #
539                for t in ecr_check_tag_exists(container=_container, tag_list=['develop', 'qa', 'stage', 'hotfix', 'demo', 'prod'], session=_s, region=_r):
540                    loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): CDK tag for ENV {t} does not exist. Pushing tag so CDK can deploy/create successfully.")
541                    _build_stub_container, _build_stub_tag = ecr_pull_from_build(container='gocd/stub', tag='stub')
542                    if not ecr_login(registry_id=_ri, session=_s, region=_r):
543                        return False
544                    _stub_container, _stub_tag = ecr_generate_fqcn(container="gocd/stub:stub", session=_s, region=_r)
545                    if not docker.docker("tag", f"{_build_stub_container}:{_build_stub_tag}", f"{_container}:{t}"):
546                        return False
547                    if not docker.docker("push", f"{_container}:{t}"):
548                        return False
549
550                # else:
551                #     loggy.info("aws.AwsSessions().ecr_push_and_scan(): No more tags to push")
552            else:
553                # Remove the original tag and add a FAILED tag to this container
554                if not docker.docker("tag", f"{_container}:{_temp_tag}", f"{_container}:FAILED_SCAN"):
555                    return False
556                if not docker.docker("push", f"{_container}:FAILED_SCAN"):
557                    return False
558
559                try:
560                    client = _s.session.client('ecr', region_name=_r)
561                    response = client.batch_delete_image(
562                        repositoryName=ecr_strip_container_name(_container),
563                        imageIds=[{
564                            'imageTag': _temp_tag
565                        }]
566                    )
567                except Exception as e:
568                    loggy.info(f"aws.AwsSessions().ecr_push_and_scan(): Failed to delete the temp tag {_temp_tag} from container after scan failed. {str(e)}")
569                    fail_code = response['failures'][0]['failureCode']
570                    fail_reason = response['failures'][0]['failureReason']
571                    loggy.info(f"Failure Code ({fail_code}). Failure Reason ({fail_reason})")
572
573                loggy.info("aws.AwsSessions().ecr_push_and_scan(): Container Scan failed. Check out pipeline results tab or ECR for Vulns.")
574
575        except Exception as e:
576            loggy.error("aws.AwsSessions().ecr_push_and_scan(): Logging out of ECR repo.")
577            ecr_logout(registry_id=_ri, session=_s, region=_r)
578            sys.exit(f"aws.AwsSessions().ecr_push_and_scan(): Error: {str(e)}")
579
580        loggy.info("aws.AwsSessions().ecr_push_and_scan(): Logging out of ECR repo.")
581        ecr_logout(registry_id=_ri, session=_s, region=_r)
582
583        return True

ecr_push_and_scan()

Push a new container to build account ECR. This is only allowed from develop or hotfix branches.

container: String representing local container name tag: String with existing local tag for container name tag_list: Optional String List of additional tags to push with the container session: will use a different to build the client, default is build session

Returns: True/False

def get_aws_account_id(session: Union[utils.aws.AwsSession, NoneType] = None) -> str:
598def get_aws_account_id(session: typing.Optional[AwsSession] = None) -> str:
599    """
600    get_aws_account_id()
601
602    Get the aws account ID using the local AWS_ACCESS_KEY_ID credentials.
603
604    session will use a different session to build the client, default is _sessions
605    """
606    global _sessions
607    _s = _sessions.session if session is None else session
608    loggy.info(f"aws.get_aws_account_id(): BEGIN (using session named: {_s.name})")
609
610    try:
611        client = _s.session.client('sts')
612        response = client.get_access_key_info(AccessKeyId=_s.creds.access_key)
613        if 'Account' not in response:
614            loggy.error("Error: Account ID not returned")
615    except Exception as e:
616        loggy.error("Error: " + str(e))
617
618    return response['Account']

get_aws_account_id()

Get the aws account ID using the local AWS_ACCESS_KEY_ID credentials.

session will use a different session to build the client, default is _sessions

def get_region(session: Union[utils.aws.AwsSession, NoneType] = None) -> str:
621def get_region(session: typing.Optional[AwsSession] = None) -> str:
622    """
623    get_region()
624
625    Get the aws region from the session.
626
627    session will use a different session to build the client, default is global _sessions
628    """
629    global _sessions
630    _s = _sessions.session if session is None else session
631    loggy.info(f"aws.get_region(): BEGIN (using session named: {_s.name})")
632
633    return _s.session.region_name

get_region()

Get the aws region from the session.

session will use a different session to build the client, default is global _sessions

def get_session()
636def get_session():
637    """
638    get_session()
639
640    Get the aws session that was instantiated on import. This will be tied to
641    the local pipeline environment. Useful if you want to build out your own
642    boto3 clients to run commands that aren't yet supported in this library.
643
644    returns: boto3.Session() object
645    """
646    global _sessions
647    loggy.info(f"aws.get_session(): BEGIN (using session named: {_sessions.name})")
648    return _sessions.session.session

get_session()

Get the aws session that was instantiated on import. This will be tied to the local pipeline environment. Useful if you want to build out your own boto3 clients to run commands that aren't yet supported in this library.

returns: boto3.Session() object

def cloudfront_create_invalidation( dist: str, items: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> str:
656def cloudfront_create_invalidation(dist: str, items: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
657    """
658    cloudfront_create_invalidation()
659
660    Create an invalidation on a CloudFront distribution.
661
662    dist: String Distribution ID
663    items: String List of paths. (i.e. ["/*"])
664    session: will use a different session to build the client, default is _sessions
665    """
666    global _sessions
667
668    _s = _sessions.session if session is None else session
669    _r = _s.session.region_name if region is None else region
670    loggy.info(f"aws.cloudfront_create_invalidation(): BEGIN (using session named: {_s.name})")
671
672    try:
673        client = _s.session.client('cloudfront', region_name=_r)
674        response = client.create_invalidation(
675            DistributionId=dist,
676            InvalidationBatch={
677                'Paths': {
678                    'Quantity': 1,
679                    'Items': items
680                },
681                'CallerReference': str(time.time()).replace(".", "")
682            }
683        )
684    except Exception as e:
685        loggy.error("Error: " + str(e))
686
687    invalidation_id = response['Invalidation']['Id']
688    return invalidation_id

cloudfront_create_invalidation()

Create an invalidation on a CloudFront distribution.

dist: String Distribution ID items: String List of paths. (i.e. ["/*"]) session: will use a different session to build the client, default is _sessions

def ecs_deploy( clusterArn: str, serviceArn: str, tag: Union[str, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
696def ecs_deploy(clusterArn: str, serviceArn: str, tag: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
697    """
698    ecsDeploy.py
699
700    Deploy a new task definition to an existing ECS Cluster/Service
701
702    NOTE 1: This is NOT blue/green. Look at `ecsBlueGreenDeploy.py`
703    NOTE 2: This will check if the clusterArn/serviceArn are NOT aws arns and instead
704            use them as ssm parameter names to grab the values
705
706    clusterArn: String (Optional) Will use ECS_CLUSTER_ARN from os.environ as default
707    serviceArn: String (Optional) Will use ECS_SERVICE_ARN from os.environ as default
708    tag: String (Optional) - Will use `release.get_version()` as default
709    session: AwsSession (Optional) will use a different session to build the client, default is _sessions
710    region: String (Optional)
711
712    Returns: True/False
713    """
714    global _sessions
715
716    _s = _sessions.session if session is None else session
717    _r = _s.session.region_name if region is None else region
718    loggy.info(f"aws.ecsDeploy(): BEGIN (using session named: {_s.name})")
719
720    _TAG = tag if tag is not None else get_version()
721    _CLUSTER_ARN = clusterArn
722    _SERVICE_ARN = serviceArn
723
724    """
725    If the _CLUSTER_ARN or _SERVICE_ARN are not in ARN format, consider them SSM Param names
726    and use them to grab the ARNS out of SSM Params
727    """
728    _CLUSTER = _CLUSTER_ARN if ':' in _CLUSTER_ARN else ssm_get_parameter(name=_CLUSTER_ARN, session=_s, region=_r)
729    _SERVICE = _SERVICE_ARN if ':' in _SERVICE_ARN else ssm_get_parameter(name=_SERVICE_ARN, session=_s, region=_r)
730
731    """
732    go get the entire task definition for a service by name (might need the cluster too)
733    """
734    loggy.info("aws.ecsDeploy(): Looking up latest task definition for cluster/service")
735    current_task_definition_arn = ecs_get_latest_task_definition_arn(cluster=_CLUSTER, service=_SERVICE, session=_s, region=_r)
736    loggy.info("aws.ecsDeploy(): Storing the entire current task definition for rollback")
737    current_task_definition = ecs_get_task_definition_from_arn(task_def_arn=current_task_definition_arn, session=_s, region=_r)
738
739    """
740    This iterates through all current_task_definition.containers
741    then for each container -> iterate through container.secrets
742    for each secret -> find one where the key is "VERSION"
743    if found, return the value which will be an ssm param arn
744    """
745    version_ssm_param_arn = ecs_get_version_param_name_from_task_def(task_def=current_task_definition)
746
747    """
748    Get the currently deployed version number
749    """
750    old_version = ssm_get_parameter(name=version_ssm_param_arn, session=_s, region=_r)
751
752    """
753    set the new version provided by the caller
754    """
755    new_version = _TAG
756
757    """
758    This iterates over the containers again to set
759    the new image in the container where
760    we simply get the old image and replace the :{tag}
761    before: docker.devops.rekor.io/blue/api:12345
762    after: docker.devops.rekor.io/blue/api:$newVersion
763    """
764    new_task_definition = ecs_set_new_image_in_task_def(task_def=current_task_definition, version=new_version)
765
766    """
767    Go register the next task def
768    there should now be a new version of the task def
769    """
770    new_task_definition_arn = ecs_register_task_definition_revision(task_def=new_task_definition, session=_s, region=_r)
771
772    """
773    update the ssm param with the new tag.
774    This function should fail gracefully as not all appilcations use an SSM param
775    to store its version. Scout uses the git commit hash for version awareness.
776    """
777    ssm_put_parameter(name=version_ssm_param_arn, value=_TAG, session=_s, region=_r)
778
779    """
780    deploy new task def to the service
781    """
782    ecs_deploy_new_task_definition(cluster=_CLUSTER, service=_SERVICE, task_def_arn=new_task_definition_arn, session=_s, region=_r)
783
784    deploy_status = ecs_wait_services_stable(cluster=_CLUSTER, service=_SERVICE, session=_s, region=_r)
785    if not deploy_status:
786        loggy.info("ecsDeploy(): Deploy FAILED! Rolling back to original task def!")
787        # Roll back procedures by rolling back the version param and setting the service back to the original task def
788        ssm_put_parameter(name=version_ssm_param_arn, value=old_version, session=_s, region=_r)
789        ecs_deploy_new_task_definition(cluster=_CLUSTER, service=_SERVICE, task_def_arn=current_task_definition_arn, session=_s, region=_r)
790        deploy_status = ecs_wait_services_stable(cluster=_CLUSTER, service=_SERVICE, session=_s, region=_r)
791        if not deploy_status:
792            raise Exception("aws.ecsDeploy(): Rolling back to original task def failed!")
793
794        ecs_deregister_task_def(task_def=new_task_definition_arn, session=_s, region=_r)
795        loggy.info("aws.ecsDeploy(): Deploy Failed! Rolled back to original task def.")
796        return False
797
798    loggy.info("aws.ecsDeploy(): Deploy Successful")
799    return True

ecsDeploy.py

Deploy a new task definition to an existing ECS Cluster/Service

NOTE 1: This is NOT blue/green. Look at ecsBlueGreenDeploy.py NOTE 2: This will check if the clusterArn/serviceArn are NOT aws arns and instead use them as ssm parameter names to grab the values

clusterArn: String (Optional) Will use ECS_CLUSTER_ARN from os.environ as default serviceArn: String (Optional) Will use ECS_SERVICE_ARN from os.environ as default tag: String (Optional) - Will use release.get_version() as default session: AwsSession (Optional) will use a different session to build the client, default is _sessions region: String (Optional)

Returns: True/False

def ecs_get_latest_task_definition_arn( cluster: str, service: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> str:
802def ecs_get_latest_task_definition_arn(cluster: str, service: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
803    """
804    ecs_get_latest_task_definition_arn()
805
806    Get the latest task definition arn for a particular service in a particular cluster
807
808    cluster: String containing ECS Cluster ARN
809    service: String containing ECS Service ARN
810
811    Returns: String containing task def arn
812    """
813    global _sessions
814    _s = _sessions.session if session is None else session
815    _r = _s.session.region_name if region is None else region
816    loggy.info(f"aws.ecs_get_latest_task_definition_arn(): BEGIN (using session named: {_s.name})")
817
818    loggy.info(f"aws.ecs_get_latest_task_definition_arn(): Searching for latest task_definition_arn in cluster/service ({cluster} / {service})")
819
820    task_def_arn = None
821    try:
822        client = _s.session.client('ecs', region_name=_r)
823        response = client.describe_services(
824            cluster=cluster,
825            services=[
826                service
827            ]
828        )
829        task_def_arn = response['services'][0]['taskDefinition']
830    except Exception as e:
831        loggy.error(f"aws.ecs_get_latest_task_definition_arn(): Error: {str(e)}")
832        raise
833
834    return task_def_arn

ecs_get_latest_task_definition_arn()

Get the latest task definition arn for a particular service in a particular cluster

cluster: String containing ECS Cluster ARN service: String containing ECS Service ARN

Returns: String containing task def arn

def ecs_get_task_definition_from_arn( task_def_arn: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> dict:
837def ecs_get_task_definition_from_arn(task_def_arn: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
838    """
839    ecs_get_task_definition_from_arn()
840
841    Get clonable task definition (json) from a task definition arn
842
843    task_def_arn: String containing task def arn
844
845    Returns: dict containing enough of the task def to clone it
846    """
847    global _sessions
848    _s = _sessions.session if session is None else session
849    _r = _s.session.region_name if region is None else region
850    loggy.info(f"aws.ecs_get_task_definition_from_arn(): BEGIN (using session named: {_s.name})")
851
852    loggy.info(f"aws.ecs_get_task_definition_from_arn(): Reading in full task definition from: {task_def_arn}")
853
854    try:
855        client = _s.session.client('ecs', region_name=_r)
856        response = client.describe_task_definition(
857            taskDefinition=task_def_arn
858        )
859
860        task_def = response['taskDefinition']
861    except Exception as e:
862        loggy.error(f"aws.ecs_get_task_definition_from_arn(): Error: {str(e)}")
863        raise
864
865    # remove_props_list = [
866    #     "taskDefinitionArn",
867    #     "revision",
868    #     "status",
869    #     "requiresAttributes",
870    #     "compatibilities",
871    #     "runtimePlatform",
872    #     "inferenceAccelerators",
873    #     "registeredAt",
874    #     "deregisteredAt",
875    #     "registeredBy",
876    #     "ephemeralStorage"
877    # ]
878    # for prop in remove_props_list:
879    #     if prop in task_def:
880    #         del task_def[prop]
881
882    # now we remove any empty values or lists
883    # task_def = remove_empty_from_dict(task_def)
884
885    return task_def

ecs_get_task_definition_from_arn()

Get clonable task definition (json) from a task definition arn

task_def_arn: String containing task def arn

Returns: dict containing enough of the task def to clone it

def ecs_get_version_param_name_from_task_def(task_def: dict) -> str:
899def ecs_get_version_param_name_from_task_def(task_def: dict) -> str:
900    """
901    ecs_get_version_param_name_from_task_def()
902
903    Get the `version` SSM param name from the task definition
904
905    task_def: dict
906
907    Return: String containing version SSM param name
908    """
909    loggy.info(f"aws.ecs_get_version_param_name_from_task_def(): Searching for VERSION ssm parameter arn in containers inside of {task_def}")
910
911    param_name = None
912    if not task_def.get('containerDefinitions'):
913        raise Exception("aws.ecs_get_version_param_name_from_task_def(): Could not locate containerDefinitions inside of the task_def dict")
914        return param_name
915
916    for container in task_def['containerDefinitions']:
917        if container.get('secrets'):
918            param_name = __ecs_check_version_in_secrets(container['secrets'])
919            if param_name:
920                break
921
922    return param_name

ecs_get_version_param_name_from_task_def()

Get the version SSM param name from the task definition

task_def: dict

Return: String containing version SSM param name

def ecs_set_new_image_in_task_def(task_def: dict, version: str) -> dict:
933def ecs_set_new_image_in_task_def(task_def: dict, version: str) -> dict:
934    """
935    ecs_set_new_image_in_task_def()
936
937    Set/replace the tag/version of the containers in a task def hashmap and returns a new hashmap
938
939    task_def: dict containing task definition
940    version: String containiing new container tag/version
941
942    Returns: dict task_def
943    """
944    if not task_def.get('containerDefinitions'):
945        raise Exception("aws.ecs_set_new_image_in_task_def(): containerDefinitions not found in task_def.")
946
947    for container in task_def['containerDefinitions']:
948        if not container.get('image'):
949            raise Exception("aws.ecs_set_new_image_in_task_def(): container image value not found in returned list.")
950            return {}
951
952        _image, _original_image_version = container['image'].split(':')
953        _image = f"{_image}:{version}"
954        loggy.info(f"aws.ecs_set_new_image_in_task_def(): Changing image version ({_original_image_version}) to ({version}) for container named ({container['name']}): new image is ${_image}")
955        container['image'] = _image
956
957    return task_def

ecs_set_new_image_in_task_def()

Set/replace the tag/version of the containers in a task def hashmap and returns a new hashmap

task_def: dict containing task definition version: String containiing new container tag/version

Returns: dict task_def

def ecs_register_task_definition_revision( task_def: dict, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> dict:
 960def ecs_register_task_definition_revision(task_def: dict, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
 961    """
 962    ecs_register_new_task_definition()
 963
 964    Register a new task definition in ECS
 965
 966    task_def: dict of a task definition to register
 967
 968    Returns: dict of new task_def
 969    """
 970    global _sessions
 971    _s = _sessions.session if session is None else session
 972    _r = _s.session.region_name if region is None else region
 973    loggy.info(f"aws.ecs_register_new_task_definition(): BEGIN (using session named: {_s.name})")
 974
 975    loggy.info("aws.ecs_register_new_task_definition(): Registering new task definition.")
 976
 977    try:
 978        client = _s.session.client('ecs', region_name=_r)
 979        if 'secrets' in task_def['containerDefinitions'][0]:
 980            response = client.register_task_definition(
 981                family=task_def['family'],
 982                containerDefinitions=task_def['containerDefinitions'],
 983                volumes=task_def['volumes'],
 984                executionRoleArn=task_def['executionRoleArn'],
 985                requiresCompatibilities=task_def['requiresCompatibilities'],
 986                networkMode=task_def['networkMode'],
 987                cpu=task_def['cpu'],
 988                memory=task_def['memory']
 989            )
 990        else:
 991            response = client.register_task_definition(
 992                family=task_def['family'],
 993                containerDefinitions=task_def['containerDefinitions'],
 994                volumes=task_def['volumes'],
 995                requiresCompatibilities=task_def['requiresCompatibilities'],
 996                networkMode=task_def['networkMode'],
 997                cpu=task_def['cpu'],
 998                memory=task_def['memory']
 999            )
1000
1001        task_def = response['taskDefinition']['taskDefinitionArn']
1002    except Exception as e:
1003        loggy.error(f"aws.ecs_register_new_task_definition(): Error: {str(e)}")
1004        return {}
1005
1006    return task_def

ecs_register_new_task_definition()

Register a new task definition in ECS

task_def: dict of a task definition to register

Returns: dict of new task_def

def ecs_deploy_new_task_definition( cluster: str, service: str, task_def_arn: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1009def ecs_deploy_new_task_definition(cluster: str, service: str, task_def_arn: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1010    """
1011    ecs_deploy_new_task_definition()
1012
1013    Deploy a task defition to a particular ECS Cluster/Service
1014
1015    cluster: String containing ECS Cluster Arn
1016    service: String containing ECS Service Arn
1017    task_def_arn: String containing task definition arn
1018
1019    Returns: True/False
1020    """
1021    global _sessions
1022    _s = _sessions.session if session is None else session
1023    _r = _s.session.region_name if region is None else region
1024    loggy.info(f"aws.ecs_deploy_new_task_definition(): BEGIN (using session named: {_s.name})")
1025
1026    loggy.info(f"aws.ecs_deploy_new_task_definition(): Deploying task defintion ({task_def_arn}) to cluster ({cluster} / service ({service}).")
1027
1028    try:
1029        client = _s.session.client('ecs', region_name=_r)
1030        client.update_service(
1031            cluster=cluster,
1032            service=service,
1033            taskDefinition=task_def_arn
1034        )
1035
1036    except Exception as e:
1037        loggy.error(f"aws.ecs_deploy_new_task_definition(): Error: {str(e)}")
1038        return False
1039
1040    return True

ecs_deploy_new_task_definition()

Deploy a task defition to a particular ECS Cluster/Service

cluster: String containing ECS Cluster Arn service: String containing ECS Service Arn task_def_arn: String containing task definition arn

Returns: True/False

def ecs_wait_services_stable( cluster: str, service: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1043def ecs_wait_services_stable(cluster: str, service: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1044    """
1045    ecs_wait_services_stable()
1046
1047    Wait up to 10 minutes for ECS services to become stable
1048
1049    cluster: String containing cluster arn
1050    service: String containing service arn
1051
1052    Returns: True/False
1053    """
1054    global _sessions
1055    _s = _sessions.session if session is None else session
1056    _r = _s.session.region_name if region is None else region
1057    loggy.info(f"aws.ecs_wait_services_stable(): BEGIN (using session named: {_s.name})")
1058
1059    loggy.info(f"aws.ecs_wait_services_stable(): Waiting for services to become stable on cluster ({cluster} / service ({service}).")
1060
1061    try:
1062        client = _s.session.client('ecs', region_name=_r)
1063        waiter = client.get_waiter('services_stable')
1064
1065        waiter.wait(
1066            cluster=cluster,
1067            services=[
1068                service
1069            ])
1070    except Exception as e:
1071        loggy.info(f"aws.ecs_wait_services_stable(): The services did not become stable: {str(e)}")
1072        raise
1073        return False
1074
1075    return True

ecs_wait_services_stable()

Wait up to 10 minutes for ECS services to become stable

cluster: String containing cluster arn service: String containing service arn

Returns: True/False

def ecs_deregister_task_def( task_def_arn: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1078def ecs_deregister_task_def(task_def_arn: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1079    """
1080    ecs_deregister_task_def()
1081
1082    Deregister a task definition
1083
1084    task_def_arn: String containing task definition arn
1085
1086    Returns: True/False
1087    """
1088    global _sessions
1089    _s = _sessions.session if session is None else session
1090    _r = _s.session.region_name if region is None else region
1091    loggy.info(f"aws.ecs_deregister_task_def(): BEGIN (using session named: {_s.name})")
1092
1093    loggy.info(f"aws.ecs_deregister_task_def(): Deregistering task definition: {task_def_arn}")
1094
1095    try:
1096        client = _s.session.client('ecs', region_name=_r)
1097        response = client.deregister_task_definition(
1098            taskDefinition=task_def_arn
1099        )
1100        if not response.get('taskDefinition') or not response['taskDefinition'].get('deregisteredAt'):
1101            raise Exception
1102    except Exception as e:
1103        loggy.info(f"aws.ecs_deregister_task_def(): Failed to deregister task definition: {str(e)}")
1104        return False
1105
1106    return True

ecs_deregister_task_def()

Deregister a task definition

task_def_arn: String containing task definition arn

Returns: True/False

def ecr_get_manifest( container: str, tag: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> str:
1114def ecr_get_manifest(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
1115    """
1116    ecr_get_manifest()
1117
1118    Get the image manifest for a remote container and tag
1119
1120    container: String representing container name
1121    tag: String tag to look up
1122    session: will use a different session to build the client, default is build session
1123    region: will use a different region to build the client, default is build region
1124
1125    Returns: String containing image manifest
1126    """
1127    global _sessions
1128    _s = _sessions.session if session is None else session
1129    _r = _s.session.region_name if region is None else region
1130    loggy.info(f"aws.ecr_get_manifest(): BEGIN (using session named: {_s.name})")
1131
1132    client = _s.session.client('ecr', region_name=_r)
1133
1134    manifest = None
1135    try:
1136        response = client.batch_get_image(
1137            repositoryName=ecr_strip_container_name(container),
1138            imageIds=[
1139                {
1140                    'imageTag': tag
1141                }
1142            ]
1143            # acceptedMediaTypes=['application/vnd.docker.distribution.manifest.v2+json'],
1144        )
1145        loggy.info(str(response))
1146        manifest = response['images'][0]['imageManifest']
1147        loggy.info(f"aws.ecr_get_manifest(): Found manifest {manifest}.")
1148    except Exception as e:
1149        loggy.info(f"aws.ecr_get_manifest(): Failed to retrieve manifest: {str(e)}")
1150        return manifest
1151
1152    return manifest

ecr_get_manifest()

Get the image manifest for a remote container and tag

container: String representing container name tag: String tag to look up session: will use a different session to build the client, default is build session region: will use a different region to build the client, default is build region

Returns: String containing image manifest

def ecr_put_image( container: str, tag: str, manifest: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1155def ecr_put_image(container: str, tag: str, manifest: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1156    """
1157    ecr_put_image()
1158
1159    Put an image manifest for a remote container and tag. Used for re-tagging an
1160    existing image
1161
1162    container: String representing container name
1163    tag: String new tag to add to image
1164    manifest: String manifest of image
1165    session: will use a different session to build the client, default is build session
1166    region: will use a different region to build the client, default is build region
1167
1168    Returns: True/False
1169    """
1170    global _sessions
1171    _s = _sessions.session if session is None else session
1172    _r = _s.session.region_name if region is None else region
1173    loggy.info(f"aws.ecr_put_image(): BEGIN (using session named: {_s.name})")
1174
1175    client = _s.session.client('ecr', region_name=_r)
1176
1177    try:
1178        client.put_image(
1179            repositoryName=ecr_strip_container_name(container),
1180            imageTag=tag,
1181            imageManifest=manifest
1182        )
1183        # loggy.info(str(response))
1184        loggy.info("aws.ecr_put_image(): Successfully put new image.")
1185    except Exception as e:
1186        if 'already exists' in str(e):
1187            loggy.info(f"aws.ecr_put_image(): Image already exists. Passing. {str(e)}")
1188            return True
1189        else:
1190            loggy.info(f"aws.ecr_put_image(): Failed to put image: {str(e)}")
1191            return False
1192
1193    return True

ecr_put_image()

Put an image manifest for a remote container and tag. Used for re-tagging an existing image

container: String representing container name tag: String new tag to add to image manifest: String manifest of image session: will use a different session to build the client, default is build session region: will use a different region to build the client, default is build region

Returns: True/False

def ecr_tag_to_build(container: str, tag_list: list) -> bool:
1196def ecr_tag_to_build(container: str, tag_list: list) -> bool:
1197    """
1198    ecr_tag_to_build()
1199
1200    Add a Tag to an existing remote ecr container.
1201    Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
1202
1203    This is the default and only path to get a container pushed into production.
1204
1205    NOTE 1: You will be logged out of any existing ECR repos. run `aws.ecr_login()` to establish a new connection
1206
1207    container: String containing existing remote container with tag "container:tag"
1208    tag_list: List containing new tags to add to the local container
1209
1210    Returns: True/False
1211    """
1212    global _sessions
1213    loggy.info(f"aws.ecr_tag_to_build(): BEGIN (using session named: {_sessions.name})")
1214    return _sessions.ecr_tag_to_build(container=container, tag_list=tag_list)

ecr_tag_to_build()

Add a Tag to an existing remote ecr container. Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")

This is the default and only path to get a container pushed into production.

NOTE 1: You will be logged out of any existing ECR repos. run aws.ecr_login() to establish a new connection

container: String containing existing remote container with tag "container:tag" tag_list: List containing new tags to add to the local container

Returns: True/False

def ecr_tag( container: str, tag: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1217def ecr_tag(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1218    """
1219    ecr_tag()
1220
1221    Add a Tag to an existing remote ecr container.
1222    Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")
1223
1224    container: String containing existing remote container with tag "container:tag"
1225    tag: String containing new tag to add to the local container
1226
1227    Returns: True/False
1228    """
1229    global _sessions
1230    _s = _sessions.session if session is None else session
1231    _r = ecr_get_region() if session is None else _s.session.region_name if region is None else region
1232    loggy.info(f"aws.ecr_tag(): BEGIN (using session named: {_s.name})")
1233
1234    loggy.info(f"aws.ecr_tag(): Tagging {container} with {tag}")
1235    if ':' not in container:
1236        raise Exception("aws.ecr_tag(): container must include tag")
1237
1238    _c, _t = ecr_generate_fqcn(container=container, session=_s, region=_r)
1239
1240    manifest = ecr_get_manifest(container=_c, tag=_t, session=_s, region=_r)
1241
1242    if ecr_put_image(container=_c, tag=tag, manifest=manifest, session=_s, region=_r):
1243        loggy.info("aws.ecr_tag(): Successfully added tag to image")
1244    else:
1245        loggy.info("aws.ecr_tag(): Failed to add new tag to image")
1246        return False
1247
1248    return True

ecr_tag()

Add a Tag to an existing remote ecr container. Example: ecr_tag(container="123456789.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy:1234", tag="latest")

container: String containing existing remote container with tag "container:tag" tag: String containing new tag to add to the local container

Returns: True/False

def ecr_promote_to_gov( container: str, tag: str, push_all_tags: Union[bool, NoneType] = True) -> bool:
1251def ecr_promote_to_gov(container: str, tag: str, push_all_tags: typing.Optional[bool] = True) -> bool:
1252    """
1253    ecr_promote_to_gov()
1254
1255    Pull container from ECR Commercial and Push it into ECR GovCloud.
1256    NOTE: This will ONLY work from a gov pipeline!
1257
1258    container: String containing existing remote container with tag "container"
1259    tag: String containing existing tag for container
1260    push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair
1261
1262    Returns: True/False
1263    """
1264    global _sessions
1265    loggy.info(f"aws.ecr_promote_to_gov(): BEGIN (using session named: {_sessions.name})")
1266
1267    return _sessions.ecr_promote_to_gov(container, tag, push_all_tags)

ecr_promote_to_gov()

Pull container from ECR Commercial and Push it into ECR GovCloud. NOTE: This will ONLY work from a gov pipeline!

container: String containing existing remote container with tag "container" tag: String containing existing tag for container push_all_tags: bool True/False defaults to True to push all tags for this container:tag pair

Returns: True/False

def ecr_push_to_build(container: str, tag: str, tag_list: Union[list, NoneType] = None) -> bool:
1270def ecr_push_to_build(container: str, tag: str, tag_list: typing.Optional[list] = None) -> bool:
1271    """
1272    ecr_push_to_build()
1273
1274    Push a locally built container into our AWS build commercial account.
1275    This is the default and only path to get a container pushed into production.
1276
1277    NOTE 1: You must be running in either `develop` or `hotfix` branch.
1278    NOTE 2: You will be logged out of any existing ECR repos. run `aws.ecr_login()` to establish a new connection
1279
1280    container: String representing container name
1281    tag: String with existing local tag for container name
1282    tag_list: Optional String List of additional tags to push with the container
1283
1284    Returns: True/False
1285    """
1286    global _sessions
1287    loggy.info(f"aws.ecr_tag(): BEGIN (using session named: {_sessions.name})")
1288
1289    return _sessions.ecr_push_and_scan(container, tag, tag_list)

ecr_push_to_build()

Push a locally built container into our AWS build commercial account. This is the default and only path to get a container pushed into production.

NOTE 1: You must be running in either develop or hotfix branch. NOTE 2: You will be logged out of any existing ECR repos. run aws.ecr_login() to establish a new connection

container: String representing container name tag: String with existing local tag for container name tag_list: Optional String List of additional tags to push with the container

Returns: True/False

def ecr_get_registry_id(session: Union[utils.aws.AwsSession, NoneType] = None) -> str:
1292def ecr_get_registry_id(session: typing.Optional[AwsSession] = None) -> str:
1293    """
1294    ecr_get_registry_id()
1295
1296    Returns default registry_id (i.e. build account) unless a session is given.
1297
1298    session: AwsSession object to pull the registry_id from a session
1299    """
1300    if session:
1301        loggy.info(f"aws.ecr_get_registry_id(): BEGIN (using session named: {session.name})")
1302
1303    _registry_id = ECR_BASE_COM['registry_id'] if session is None else get_aws_account_id(session)
1304    return _registry_id

ecr_get_registry_id()

Returns default registry_id (i.e. build account) unless a session is given.

session: AwsSession object to pull the registry_id from a session

def ecr_get_region(session: Union[utils.aws.AwsSession, NoneType] = None) -> str:
1307def ecr_get_region(session: typing.Optional[AwsSession] = None) -> str:
1308    """
1309    ecr_get_region()
1310
1311    Returns default registry region (i.e. build account) unless a session is given.
1312
1313    session: (Optional) String to pull the region from a session
1314    """
1315    if session:
1316        loggy.info(f"aws.ecr_get_region(): BEGIN (using session named: {session.name})")
1317
1318    _region = ECR_BASE_COM['region'] if session is None else get_region(session)
1319    return _region

ecr_get_region()

Returns default registry region (i.e. build account) unless a session is given.

session: (Optional) String to pull the region from a session

def ecr_describe_image( container: str, tag: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> dict:
1322def ecr_describe_image(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
1323    """
1324    ecr_describe_image()
1325
1326    Returns a dict with the response from aws call to describe images
1327
1328    container: String of container
1329    tag: String of container tag
1330    session: will use this session to build the client, default is _sessions
1331    region: will use a specfic region to build the client, default is _sessions.region_name
1332    """
1333    global _sessions
1334    _s = _sessions.session if session is None else session
1335    _r = _s.session.region_name if region is None else region
1336    loggy.info(f"aws.ecr_describe_image(): BEGIN (using session named: {_s.name})")
1337
1338    client = _s.session.client('ecr', region_name=_r)
1339
1340    try:
1341        response = client.describe_images(
1342            repositoryName=ecr_strip_container_name(container),
1343            imageIds=[
1344                {
1345                    'imageTag': tag
1346                }
1347            ]
1348        )
1349        # loggy.info(str(response))
1350        loggy.info("aws.ecr_describe_image(): Successfully pulled image information.")
1351    except Exception as e:
1352        loggy.info(f"aws.ecr_describe_image(): Failed to pull image information: {str(e)}")
1353        return {}
1354
1355    if not response.get('imageDetails'):
1356        loggy.info(f"aws.ecr_describe_image(): Failed to pull image information. {str(response)}")
1357        return {}
1358
1359    return response

ecr_describe_image()

Returns a dict with the response from aws call to describe images

container: String of container tag: String of container tag session: will use this session to build the client, default is _sessions region: will use a specfic region to build the client, default is _sessions.region_name

def ecr_logout( registry_id: Union[str, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> None:
1362def ecr_logout(registry_id: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> None:
1363    """
1364    ecr_logout()
1365
1366    Logout from an ECR repo
1367
1368    registry_id: Authenticate to a different registry_id (aka AWS ECR)
1369    session: will use this session to build the client, default is _sessions
1370    region: will use a specfic region to build the client, default is _sessions.region_name
1371    """
1372    global _sessions
1373
1374    loggy.info("aws.ecr_logout(): BEGIN")
1375
1376    _s = _sessions.session if session is None else session
1377    _r = _s.session.region_name if region is None else region
1378    _reg = get_aws_account_id(_s) if registry_id is None else registry_id
1379    _repo = f"{_reg}.dkr.ecr.{_r}.amazonaws.com"
1380
1381    docker.logout(repo=_repo)
1382    return

ecr_logout()

Logout from an ECR repo

registry_id: Authenticate to a different registry_id (aka AWS ECR) session: will use this session to build the client, default is _sessions region: will use a specfic region to build the client, default is _sessions.region_name

def ecr_login( registry_id: Union[str, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1385def ecr_login(registry_id: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1386    """
1387    ecr_login()
1388
1389    Authenticate Docker against an ECR repository.
1390
1391    Default: Authenticate using _sessions (current environment) to pull/push there.
1392
1393    registry_id: Authenticate to a different registry_id (aka AWS ECR)
1394    session: will use this session to build the client, default is _sessions
1395    region: will use a specfic region to build the client, default is _sessions.region_name
1396
1397    Returns: True/False
1398    """
1399    global _sessions
1400
1401    loggy.info("aws.ecr_login(): BEGIN")
1402
1403    _s = _sessions.session if session is None else session
1404    _r = _s.session.region_name if region is None else region
1405    _reg = get_aws_account_id(_s) if registry_id is None else registry_id
1406
1407    loggy.info(f"aws.ecr_login(): BEGIN (using session named: {_s.name})")
1408
1409    try:
1410        loggy.info(f"aws.ecr_login(): registry_id ({_reg}) region ({_r})")
1411        client = _s.session.client('ecr', region_name=_r)
1412        response = client.get_authorization_token(registryIds=[_reg])
1413    except Exception as e:
1414        loggy.error("Error: " + str(e))
1415        raise
1416
1417    try:
1418        auth = response["authorizationData"][0]
1419    except (IndexError, KeyError):
1420        raise RuntimeError("Unable to get authorization token from ECR!")
1421    except Exception as e:
1422        loggy.error("Error: " + str(e))
1423        raise
1424
1425    auth_token = base64.b64decode(auth["authorizationToken"]).decode()
1426    username, password = auth_token.split(":")
1427
1428    # cmd = [
1429    #     "login",
1430    #     "--username",
1431    #     username,
1432    #     "--password",
1433    #     password,
1434    #     auth["proxyEndpoint"],
1435    # ]
1436
1437    return docker.login(username=username, password=password, repo=auth["proxyEndpoint"])

ecr_login()

Authenticate Docker against an ECR repository.

Default: Authenticate using _sessions (current environment) to pull/push there.

registry_id: Authenticate to a different registry_id (aka AWS ECR) session: will use this session to build the client, default is _sessions region: will use a specfic region to build the client, default is _sessions.region_name

Returns: True/False

def ecr_generate_build_fqcn(container: str) -> [, ]:
1440def ecr_generate_build_fqcn(container: str) -> [str, str]:
1441    """
1442    ecr_generate_build_fqcn()
1443
1444    Generate a fully qualified aws docker container string for build account
1445    (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
1446
1447    container: String representing container name:tag
1448
1449    Returns: String container and String tag (None if tag doesn't exist)
1450    """
1451    global _sessions
1452    loggy.info(f"aws.ecr_generate_build_fqcn(): BEGIN (using session named: {_sessions.name})")
1453    return _sessions.ecr_generate_fqcn(container=container)

ecr_generate_build_fqcn()

Generate a fully qualified aws docker container string for build account (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)

container: String representing container name:tag

Returns: String container and String tag (None if tag doesn't exist)

def ecr_generate_fqcn( container: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> [, ]:
1456def ecr_generate_fqcn(container: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> [str, str]:
1457    """
1458    ecr_generate_fqcn()
1459
1460    Generate a fully qualified aws docker container string based on current session Credentials
1461    (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)
1462
1463    container: String representing container name:tag
1464
1465    Returns: String container and String tag (None if tag doesn't exist)
1466    """
1467    global _sessions
1468
1469    if "dkr.ecr" in container:
1470        loggy.info(f"aws.ecr_generate_fqcn(): ECR URL already exists: {container}. Stripping container and creating a new ECR URL")
1471        container = ecr_strip_container_name(container=container)
1472        # return container.split(':')
1473
1474    _s = _sessions.session if session is None else session
1475    _r = ecr_get_region() if session is None else _s.session.region_name if region is None else region
1476    _reg = ecr_get_registry_id(_s)
1477
1478    loggy.info(f"aws.ecr_generate_fqcn(): BEGIN (using session named: {_s.name})")
1479
1480    loggy.info(f"aws.ecr_generate_fqcn(): Generated ECR URL: {_reg}.dkr.ecr.{_r}.amazonaws.com/{container}")
1481
1482    _ret = f"{_reg}.dkr.ecr.{_r}.amazonaws.com/{container}".split(':')
1483    if len(_ret) < 2:
1484        return _ret[0], None
1485
1486    return _ret[0], _ret[1]

ecr_generate_fqcn()

Generate a fully qualified aws docker container string based on current session Credentials (i.e. 552324424.dkr.ecr.us-east-1.amazonaws.com/mirrored/timothy)

container: String representing container name:tag

Returns: String container and String tag (None if tag doesn't exist)

def ecr_pull_from_build( container: str, tag: Union[str, NoneType] = None) -> [, ]:
1489def ecr_pull_from_build(container: str, tag: typing.Optional[str] = None) -> [str, str]:
1490    """
1491    ecr_pull_from_build()
1492
1493    Run a docker pull for a container:tag
1494
1495    container: String representing container name ("name:tag" is accepted)
1496    tag: String representing the tag of the container to pull (Optional)
1497
1498    NOTE: if tag is None and container does not include :tag in the String, we
1499    will append :latest to the container name
1500
1501    Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
1502    """
1503    global _sessions
1504    loggy.info(f"aws.ecr_pull_from_build(): BEGIN (using session named: {_sessions.name})")
1505    return _sessions.ecr_pull_from_build(container=container, tag=tag)

ecr_pull_from_build()

Run a docker pull for a container:tag

container: String representing container name ("name:tag" is accepted) tag: String representing the tag of the container to pull (Optional)

NOTE: if tag is None and container does not include :tag in the String, we will append :latest to the container name

Returns: Tuple [str, str] First str is full ECR name of container, second str is tag

def ecr_pull( container: str, tag: Union[str, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> [, ]:
1508def ecr_pull(container: str, tag: typing.Optional[str] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> [str, str]:
1509    """
1510    ecr_pull()
1511
1512    Run a docker pull for a container:tag
1513    Ensure you run aws.ecr_login() first
1514
1515    container: String representing container name ("name:tag" is accepted)
1516    tag: String representing the tag of the container to pull (Optional)
1517
1518    NOTE: if tag is None and container does not include :tag in the String, we
1519    will append :latest to the container name
1520
1521    Returns: Tuple [str, str] First str is full ECR name of container, second str is tag
1522    """
1523    # global _sessions
1524    # _s = _sessions.session if session is None else session
1525    #
1526    # ecr_login(_s)
1527    # _container = container if ':' in container else f"{container}:latest" if tag is None else f"{container}:{tag}"
1528
1529    _c, _t = ecr_generate_fqcn(container=container, session=session, region=region)
1530    _t = tag if tag is not None else 'latest' if _t is None else _t
1531
1532    loggy.info(f"aws.ecr_pull(): Pulling container: {_c}:{_t}")
1533
1534    if not docker.docker("pull", f"{_c}:{_t}"):
1535        return None, None
1536
1537    return _c, _t

ecr_pull()

Run a docker pull for a container:tag Ensure you run aws.ecr_login() first

container: String representing container name ("name:tag" is accepted) tag: String representing the tag of the container to pull (Optional)

NOTE: if tag is None and container does not include :tag in the String, we will append :latest to the container name

Returns: Tuple [str, str] First str is full ECR name of container, second str is tag

def ecr_check_tag_exists( container: str, tag_list: list, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> list:
1540def ecr_check_tag_exists(container: str, tag_list: list, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> list:
1541    """
1542    ecr_check_tag_exists()
1543
1544    Check if tags exist for a container
1545
1546    container: String representing container name
1547    tag: String List of tags to check
1548    session: will use a different session to build the client, default is build session
1549    region: will use a different region to build the client, default is build region
1550
1551    Returns: String List of tags that are not found. Will return empty List on failure. (i.e. Fail Open)
1552    """
1553    global _sessions
1554    _s = _sessions.session if session is None else session
1555    _r = _s.session.region_name if region is None else region
1556
1557    loggy.info(f"aws.ecr_check_tag_exists(): BEGIN (using session named: {_s.name})")
1558
1559    client = _s.session.client('ecr', region_name=_r)
1560
1561    missing = tag_list
1562    try:
1563        response = client.describe_images(
1564            repositoryName=ecr_strip_container_name(container),
1565            filter={
1566                'tagStatus': 'TAGGED'
1567            }
1568        )
1569        loggy.info(str(response))
1570        for i in response['imageDetails']:
1571            for tag in tag_list:
1572                if tag in i['imageTags']:
1573                    loggy.info(f"aws.ecr_check_tag_exists(): Found tag {tag}. Removing from List.")
1574                    missing.remove(tag)
1575    except Exception as e:
1576        loggy.info(f"aws.ecr_check_tag_exists(): Failed to retrieve tags: {str(e)}")
1577        loggy.info("aws.ecr_check_tag_exists(): FAILING OPEN! Returning empty List.")
1578        return []
1579
1580    return missing

ecr_check_tag_exists()

Check if tags exist for a container

container: String representing container name tag: String List of tags to check session: will use a different session to build the client, default is build session region: will use a different region to build the client, default is build region

Returns: String List of tags that are not found. Will return empty List on failure. (i.e. Fail Open)

def ecr_push( container: str, tag: str, tag_list: Union[list, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1583def ecr_push(container: str, tag: str, tag_list: typing.Optional[list] = None, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1584    """
1585    ecr_push()
1586
1587    Push a new container to build account ECR
1588
1589    container: String representing container name
1590    tag: String with existing local tag for container name
1591    tag_list: Optional String List of additional tags to push with the container
1592    session: will use a different session to build the client, default is build session
1593
1594    Returns: True/False
1595    """
1596    global _sessions
1597    loggy.info(f"aws.ecr_push(): BEGIN (using session named: {_sessions.name})")
1598
1599    return _sessions.ecr_push(container, tag, tag_list, session, region)

ecr_push()

Push a new container to build account ECR

container: String representing container name tag: String with existing local tag for container name tag_list: Optional String List of additional tags to push with the container session: will use a different session to build the client, default is build session

Returns: True/False

def ecr_strip_container_name(container: str) -> str:
1602def ecr_strip_container_name(container: str) -> str:
1603    """"
1604    ecr_strip_container_name()
1605
1606    Remove prefix of repo url from the repo name itself
1607
1608    container: String like "575815261832.dkr.ecr.******.amazonaws.com/mirrored/amazoncorretto"
1609
1610    Returns: String containing just container like "mirrored/amazoncorretto"
1611    """
1612    _new = container.split('/')
1613    del _new[0]
1614    return '/'.join(_new)

" ecr_strip_container_name()

Remove prefix of repo url from the repo name itself

container: String like "575815261832.dkr.ecr.**.amazonaws.com/mirrored/amazoncorretto"

Returns: String containing just container like "mirrored/amazoncorretto"

def ecr_scan( container: str, tag: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> bool:
1617def ecr_scan(container: str, tag: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> bool:
1618    """
1619    ecr_scan()
1620
1621    Scan and wait for results of a container
1622
1623    container: String representing container name
1624    tag: String tags to scan
1625    session: will use a different to build the client, default is build session
1626
1627    Returns: True/False
1628    """
1629    global _sessions
1630    _s = _sessions.session if session is None else session
1631    _r = _s.session.region_name if region is None else region
1632
1633    loggy.info(f"aws.ecr_scan(): BEGIN (using session named: {_s.name})")
1634
1635    client = _s.session.client('ecr', region_name=_r)
1636    res = "IN_PROGRESS"
1637    _wait = 300
1638    try:
1639        while 'IN_PROGRESS' in res and _wait > 0:
1640            loggy.info(f"aws.ecr_scan(): Waiting up to 300 seconds for the ECR Scan of {container} with tag {tag} to complete.")
1641            response = client.describe_image_scan_findings(
1642                repositoryName=ecr_strip_container_name(container),
1643                imageId={
1644                    'imageTag': tag
1645                }
1646            )
1647            time.sleep(5)
1648            _wait -= 5
1649            res = response['imageScanStatus']['status']
1650
1651        loggy.info(f"aws.ecr_scan(): Scan completed with: {res}")
1652    except Exception as e:
1653        loggy.info(f"aws.ecr_scan(): Failed to get the scan status: {str(e)}")
1654        return False
1655
1656    """
1657    TODO: Check the scan for vulns
1658    """
1659
1660    return True

ecr_scan()

Scan and wait for results of a container

container: String representing container name tag: String tags to scan session: will use a different to build the client, default is build session

Returns: True/False

def ssm_get_parameter_from_build( name: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> str:
1668def ssm_get_parameter_from_build(name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
1669    """
1670    ssm_get_parameter_from_build()
1671
1672    Get an SSM Parameter Value from the build account.
1673
1674    name: String containing param friendly name or arn. Will convert arn into friendly name before use.
1675    session: aws.Sessions() will use a different session to build the client, default is _sessions
1676    region: String defaults to AWS_DEFAULT_REGION or us-east-1
1677
1678    Returns String containing param value
1679    """
1680    global _sessions
1681    loggy.info(f"aws.ssm_get_parameter_from_build(): BEGIN (using session named: {_sessions.name})")
1682    return _sessions.ssm_get_parameter(name, session, region)

ssm_get_parameter_from_build()

Get an SSM Parameter Value from the build account.

name: String containing param friendly name or arn. Will convert arn into friendly name before use. session: aws.Sessions() will use a different session to build the client, default is _sessions region: String defaults to AWS_DEFAULT_REGION or us-east-1

Returns String containing param value

def ssm_get_parameter( name: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> str:
1686def ssm_get_parameter(name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> str:
1687    """
1688    ssm_get_parameter()
1689
1690    Get an SSM Parameter Value
1691
1692    name: String containing param friendly name or arn. Will convert arn into friendly name before use.
1693    region: String defaults to AWS_DEFAULT_REGION or us-east-1
1694    session: aws.Sessions() will use a different session to build the client, default is _sessions
1695
1696    Returns String containing param value
1697    """
1698    global _sessions
1699    _s = _sessions.session if session is None else session
1700    _r = _s.session.region_name if region is None else region
1701
1702    loggy.info(f"aws.ssm_get_parameter(): BEGIN (using session named: {_s.name})")
1703
1704    """
1705    This function only takes a name not an arn
1706    """
1707    name = name if ':parameter' not in name else name.split(':parameter')[1]
1708
1709    try:
1710        client = _s.session.client(service_name='ssm', region_name=_r)
1711        response = client.get_parameter(Name=name, WithDecryption=True)
1712
1713        return response['Parameter']['Value']
1714    except Exception as e:
1715        loggy.error("Error: " + str(e))
1716
1717    return None

ssm_get_parameter()

Get an SSM Parameter Value

name: String containing param friendly name or arn. Will convert arn into friendly name before use. region: String defaults to AWS_DEFAULT_REGION or us-east-1 session: aws.Sessions() will use a different session to build the client, default is _sessions

Returns String containing param value

def ssm_put_parameter( name: str, value: str, type: Union[str, NoneType] = None, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None, KeyId: Union[str, NoneType] = None) -> str:
1720def ssm_put_parameter(name: str,
1721                      value: str,
1722                      type: typing.Optional[str] = None,
1723                      session: typing.Optional[AwsSession] = None,
1724                      region: typing.Optional[str] = None,
1725                      KeyId: typing.Optional[str] = None) -> str:
1726    """
1727    ssm_put_parameter()
1728
1729    Create/Update an SSM Parameter
1730
1731    name and value are required
1732    type defaults to String
1733    region defaults to AWS_DEFAULT_REGION or us-east-1
1734    session will use a different session to build the client, default is _sessions
1735    KeyId is a string containing the KeyId for the encryption key to use if not default
1736
1737    Returns: String of new ssm param version
1738    """
1739    global _sessions
1740    _s = _sessions.session if session is None else session
1741    _r = _s.session.region_name if region is None else region
1742
1743    loggy.info(f"aws.ssm_put_parameter(): BEGIN (using session named: {_s.name})")
1744
1745    if not type:
1746        type = "String"
1747
1748    """
1749    This function only takes a name not an arn
1750    """
1751    name = name if ':parameter' not in name else name.split(':parameter')[1]
1752
1753    try:
1754        client = _s.session.client(service_name='ssm', region_name=_r)
1755        if KeyId:
1756            response = client.put_parameter(Name=name, Value=value, Type=type, Overwrite=True, KeyId=KeyId)
1757        else:
1758            response = client.put_parameter(Name=name, Value=value, Type=type, Overwrite=True)
1759
1760        return str(response['Version'])
1761    except Exception as e:
1762        loggy.error("Error: " + str(e))
1763
1764    return None

ssm_put_parameter()

Create/Update an SSM Parameter

name and value are required type defaults to String region defaults to AWS_DEFAULT_REGION or us-east-1 session will use a different session to build the client, default is _sessions KeyId is a string containing the KeyId for the encryption key to use if not default

Returns: String of new ssm param version

def secrets_get_secret_string( name: str, session: Union[utils.aws.AwsSession, NoneType] = None, region: Union[str, NoneType] = None) -> dict:
1772def secrets_get_secret_string(name: str, session: typing.Optional[AwsSession] = None, region: typing.Optional[str] = None) -> dict:
1773    """
1774    secrets_get_secret_string()
1775
1776    Retrieve sectret string value using ENV Variables as Credentials.
1777    This will retrieve the AWSCURRENT version.
1778
1779    name is required
1780    region defaults to AWS_DEFAULT_REGION or us-east-1
1781    session will use a different session to build the client, default is _sessions
1782
1783    Returns: dict containing secret string
1784    """
1785    global _sessions
1786    _s = _sessions.session if session is None else session
1787    _r = _s.session.region_name if region is None else region
1788
1789    loggy.info(f"aws.secrets_get_secret_string(): BEGIN (using session named: {_s.name})")
1790    loggy.info(f"aws.secrets_get_secret_string(): region name {_r}")
1791    try:
1792        client = _s.session.client(service_name='secretsmanager', region_name=_r)
1793
1794        response = client.get_secret_value(SecretId=name)
1795
1796        return json.loads(response['SecretString'])
1797    except Exception as e:
1798        loggy.error("Error: " + str(e))
1799        raise
1800
1801    return {}

secrets_get_secret_string()

Retrieve sectret string value using ENV Variables as Credentials. This will retrieve the AWSCURRENT version.

name is required region defaults to AWS_DEFAULT_REGION or us-east-1 session will use a different session to build the client, default is _sessions

Returns: dict containing secret string