Source code for certbot.cert_manager

"""Tools for managing certificates."""
import datetime
import logging
import re
import traceback

import pytz
import zope.component

from acme.magic_typing import List  # pylint: disable=unused-import, no-name-in-module

from certbot import crypto_util
from certbot import errors
from certbot import interfaces
from certbot import ocsp
from certbot import storage
from certbot import util
from certbot.compat import os
from certbot.display import util as display_util

logger = logging.getLogger(__name__)

# Commands

[docs]def rename_lineage(config): """Rename the specified lineage to the new name. :param config: Configuration. :type config: :class:`certbot.configuration.NamespaceConfig` """ disp = zope.component.getUtility(interfaces.IDisplay) certname = get_certnames(config, "rename")[0] new_certname = config.new_certname if not new_certname: code, new_certname = disp.input( "Enter the new name for certificate {0}".format(certname), flag="--updated-cert-name", force_interactive=True) if code != display_util.OK or not new_certname: raise errors.Error("User ended interaction.") lineage = lineage_for_certname(config, certname) if not lineage: raise errors.ConfigurationError("No existing certificate with name " "{0} found.".format(certname)) storage.rename_renewal_config(certname, new_certname, config) disp.notification("Successfully renamed {0} to {1}." .format(certname, new_certname), pause=False)
[docs]def certificates(config): """Display information about certs configured with Certbot :param config: Configuration. :type config: :class:`certbot.configuration.NamespaceConfig` """ parsed_certs = [] parse_failures = [] for renewal_file in storage.renewal_conf_files(config): try: renewal_candidate = storage.RenewableCert(renewal_file, config) crypto_util.verify_renewable_cert(renewal_candidate) parsed_certs.append(renewal_candidate) except Exception as e: # pylint: disable=broad-except logger.warning("Renewal configuration file %s produced an " "unexpected error: %s. Skipping.", renewal_file, e) logger.debug("Traceback was:\n%s", traceback.format_exc()) parse_failures.append(renewal_file) # Describe all the certs _describe_certs(config, parsed_certs, parse_failures)
[docs]def delete(config): """Delete Certbot files associated with a certificate lineage.""" certnames = get_certnames(config, "delete", allow_multiple=True) for certname in certnames: storage.delete_files(config, certname) disp = zope.component.getUtility(interfaces.IDisplay) disp.notification("Deleted all files relating to certificate {0}." .format(certname), pause=False)
################### # Public Helpers ###################
[docs]def lineage_for_certname(cli_config, certname): """Find a lineage object with name certname.""" configs_dir = cli_config.renewal_configs_dir # Verify the directory is there util.make_or_verify_dir(configs_dir, mode=0o755) try: renewal_file = storage.renewal_file_for_certname(cli_config, certname) except errors.CertStorageError: return None try: return storage.RenewableCert(renewal_file, cli_config) except (errors.CertStorageError, IOError): logger.debug("Renewal conf file %s is broken.", renewal_file) logger.debug("Traceback was:\n%s", traceback.format_exc()) return None
[docs]def domains_for_certname(config, certname): """Find the domains in the cert with name certname.""" lineage = lineage_for_certname(config, certname) return lineage.names() if lineage else None
[docs]def find_duplicative_certs(config, domains): """Find existing certs that match the given domain names. This function searches for certificates whose domains are equal to the `domains` parameter and certificates whose domains are a subset of the domains in the `domains` parameter. If multiple certificates are found whose names are a subset of `domains`, the one whose names are the largest subset of `domains` is returned. If multiple certificates' domains are an exact match or equally sized subsets, which matching certificates are returned is undefined. :param config: Configuration. :type config: :class:`certbot.configuration.NamespaceConfig` :param domains: List of domain names :type domains: `list` of `str` :returns: lineages representing the identically matching cert and the largest subset if they exist :rtype: `tuple` of `storage.RenewableCert` or `None` """ def update_certs_for_domain_matches(candidate_lineage, rv): """Return cert as identical_names_cert if it matches, or subset_names_cert if it matches as subset """ # TODO: Handle these differently depending on whether they are # expired or still valid? identical_names_cert, subset_names_cert = rv candidate_names = set(candidate_lineage.names()) if candidate_names == set(domains): identical_names_cert = candidate_lineage elif candidate_names.issubset(set(domains)): # This logic finds and returns the largest subset-names cert # in the case where there are several available. if subset_names_cert is None: subset_names_cert = candidate_lineage elif len(candidate_names) > len(subset_names_cert.names()): subset_names_cert = candidate_lineage return (identical_names_cert, subset_names_cert) return _search_lineages(config, update_certs_for_domain_matches, (None, None))
[docs]def _archive_files(candidate_lineage, filetype): """ In order to match things like: /etc/letsencrypt/archive/ Anonymous functions which call this function are eventually passed (in a list) to `match_and_check_overlaps` to help specify the acceptable_matches. :param `.storage.RenewableCert` candidate_lineage: Lineage whose archive dir is to be searched. :param str filetype: main file name prefix e.g. "fullchain" or "chain". :returns: Files in candidate_lineage's archive dir that match the provided filetype. :rtype: list of str or None """ archive_dir = candidate_lineage.archive_dir pattern = [os.path.join(archive_dir, f) for f in os.listdir(archive_dir) if re.match("{0}[0-9]*.pem".format(filetype), f)] if pattern: return pattern return None
[docs]def _acceptable_matches(): """ Generates the list that's passed to match_and_check_overlaps. Is its own function to make unit testing easier. :returns: list of functions :rtype: list """ return [lambda x: x.fullchain_path, lambda x: x.cert_path, lambda x: _archive_files(x, "cert"), lambda x: _archive_files(x, "fullchain")]
[docs]def cert_path_to_lineage(cli_config): """ If config.cert_path is defined, try to find an appropriate value for config.certname. :param `configuration.NamespaceConfig` cli_config: parsed command line arguments :returns: a lineage name :rtype: str :raises `errors.Error`: If the specified cert path can't be matched to a lineage name. :raises `errors.OverlappingMatchFound`: If the matched lineage's archive is shared. """ acceptable_matches = _acceptable_matches() match = match_and_check_overlaps(cli_config, acceptable_matches, lambda x: cli_config.cert_path[0], lambda x: x.lineagename) return match[0]
[docs]def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func): """ Searches through all lineages for a match, and checks for duplicates. If a duplicate is found, an error is raised, as performing operations on lineages that have their properties incorrectly duplicated elsewhere is probably a bad idea. :param `configuration.NamespaceConfig` cli_config: parsed command line arguments :param list acceptable_matches: a list of functions that specify acceptable matches :param function match_func: specifies what to match :param function rv_func: specifies what to return """ def find_matches(candidate_lineage, return_value, acceptable_matches): """Returns a list of matches using _search_lineages.""" acceptable_matches = [func(candidate_lineage) for func in acceptable_matches] acceptable_matches_rv = [] # type: List[str] for item in acceptable_matches: if isinstance(item, list): acceptable_matches_rv += item else: acceptable_matches_rv.append(item) match = match_func(candidate_lineage) if match in acceptable_matches_rv: return_value.append(rv_func(candidate_lineage)) return return_value matched = _search_lineages(cli_config, find_matches, [], acceptable_matches) if not matched: raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path[0])) elif len(matched) > 1: raise errors.OverlappingMatchFound() else: return matched
[docs]def human_readable_cert_info(config, cert, skip_filter_checks=False): """ Returns a human readable description of info about a RenewableCert object""" certinfo = [] checker = ocsp.RevocationChecker() if config.certname and cert.lineagename != config.certname and not skip_filter_checks: return "" if and not set( return "" now = pytz.UTC.fromutc(datetime.datetime.utcnow()) reasons = [] if cert.is_test_cert: reasons.append('TEST_CERT') if cert.target_expiry <= now: reasons.append('EXPIRED') elif checker.ocsp_revoked(cert): reasons.append('REVOKED') if reasons: status = "INVALID: " + ", ".join(reasons) else: diff = cert.target_expiry - now if diff.days == 1: status = "VALID: 1 day" elif diff.days < 1: status = "VALID: {0} hour(s)".format(diff.seconds // 3600) else: status = "VALID: {0} days".format(diff.days) valid_string = "{0} ({1})".format(cert.target_expiry, status) certinfo.append(" Certificate Name: {0}\n" " Domains: {1}\n" " Expiry Date: {2}\n" " Certificate Path: {3}\n" " Private Key Path: {4}".format( cert.lineagename, " ".join(cert.names()), valid_string, cert.fullchain, cert.privkey)) return "".join(certinfo)
[docs]def get_certnames(config, verb, allow_multiple=False, custom_prompt=None): """Get certname from flag, interactively, or error out. """ certname = config.certname if certname: certnames = [certname] else: disp = zope.component.getUtility(interfaces.IDisplay) filenames = storage.renewal_conf_files(config) choices = [storage.lineagename_for_filename(name) for name in filenames] if not choices: raise errors.Error("No existing certificates found.") if allow_multiple: if not custom_prompt: prompt = "Which certificate(s) would you like to {0}?".format(verb) else: prompt = custom_prompt code, certnames = disp.checklist( prompt, choices, cli_flag="--cert-name", force_interactive=True) if code != display_util.OK: raise errors.Error("User ended interaction.") else: if not custom_prompt: prompt = "Which certificate would you like to {0}?".format(verb) else: prompt = custom_prompt code, index = prompt, choices, cli_flag="--cert-name", force_interactive=True) if code != display_util.OK or index not in range(0, len(choices)): raise errors.Error("User ended interaction.") certnames = [choices[index]] return certnames
################### # Private Helpers ###################
[docs]def _report_lines(msgs): """Format a results report for a category of single-line renewal outcomes""" return " " + "\n ".join(str(msg) for msg in msgs)
[docs]def _report_human_readable(config, parsed_certs): """Format a results report for a parsed cert""" certinfo = [] for cert in parsed_certs: certinfo.append(human_readable_cert_info(config, cert)) return "\n".join(certinfo)
[docs]def _describe_certs(config, parsed_certs, parse_failures): """Print information about the certs we know about""" out = [] # type: List[str] notify = out.append if not parsed_certs and not parse_failures: notify("No certs found.") else: if parsed_certs: match = "matching " if config.certname or else "" notify("Found the following {0}certs:".format(match)) notify(_report_human_readable(config, parsed_certs)) if parse_failures: notify("\nThe following renewal configurations " "were invalid:") notify(_report_lines(parse_failures)) disp = zope.component.getUtility(interfaces.IDisplay) disp.notification("\n".join(out), pause=False, wrap=False)
[docs]def _search_lineages(cli_config, func, initial_rv, *args): """Iterate func over unbroken lineages, allowing custom return conditions. Allows flexible customization of return values, including multiple return values and complex checks. :param `configuration.NamespaceConfig` cli_config: parsed command line arguments :param function func: function used while searching over lineages :param initial_rv: initial return value of the function (any type) :returns: Whatever was specified by `func` if a match is found. """ configs_dir = cli_config.renewal_configs_dir # Verify the directory is there util.make_or_verify_dir(configs_dir, mode=0o755) rv = initial_rv for renewal_file in storage.renewal_conf_files(cli_config): try: candidate_lineage = storage.RenewableCert(renewal_file, cli_config) except (errors.CertStorageError, IOError): logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file) logger.debug("Traceback was:\n%s", traceback.format_exc()) continue rv = func(candidate_lineage, rv, *args) return rv