Source code for meta_package_manager.cli

# Copyright Kevin Deldycke <kevin@deldycke.com> and contributors.
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
"""The :command:`mpm` command-line interface.

Defines the Click command group and its subcommands. Each operation subcommand
(``installed``, ``outdated``, ``install``, ``upgrade``, ``remove``, ...) selects the
managers from :py:mod:`meta_package_manager.pool` that implement the matching
:py:class:`meta_package_manager.capabilities.Operations` action, runs it across all
of them, and renders the aggregated, multi-manager result.
"""

from __future__ import annotations

import logging
import platform
import sys
import threading
from collections import Counter, namedtuple
from collections.abc import Iterable
from configparser import RawConfigParser
from datetime import datetime, timezone
from functools import partial
from io import TextIOWrapper
from pathlib import Path
from textwrap import dedent
from unittest.mock import patch

import tomli_w
from boltons.cacheutils import LRI, cached
from click_extra import (
    STRING,
    Choice,
    EnumChoice,
    File,
    IntRange,
    Section,
    UsageError,
    VersionOption,
    argument,
    echo,
    file_path,
    group,
    jobs_option,
    option,
    option_group,
    pass_context,
    search_params,
)
from click_extra.context import PROGRESS, TABLE_FORMAT, VERBOSITY_LEVEL
from click_extra.highlight import HelpKeywords, highlight
from click_extra.logging import LogLevel
from click_extra.table import (
    SERIALIZATION_FORMATS,
    print_data,
)
from click_extra.theme import KO_GLYPH, OK_GLYPH, get_current_theme as theme
from extra_platforms import current_architecture, current_platform, reduce

from . import __version__, bar_plugin
from .bar_plugin_renderer import BarPluginRenderer
from .brewfile import build_brewfile
from .capabilities import Operations
from .config import (
    MpmConfig,
    apply_manager_overrides_from_context,
    build_manager_overrides_validator,
    dump_manager_overrides,
    print_contribution_hints,
)
from .duration import Duration
from .execution import (
    CLIError,
    OperationTrail,
    collect_from_managers,
    collect_per_package,
    highlight_cli_name,
    warn_jobs_ignored,
)
from .manager import PackageManager
from .package import packages_asdict
from .platforms import MAIN_PLATFORMS
from .pool import pool
from .sbom import (
    SBOM,
    SPDX,
    CycloneDX,
    ExportFormat,
    cyclonedx_support,
    spdx_support,
)
from .sorting import SortableField, print_sorted_table
from .specifier import VERSION_SEP, Solver, Specifier
from .summary import package_counts, print_summary, sbom_summary
from .version import diff_versions

if sys.version_info >= (3, 11):
    import tomllib
else:
    import tomli as tomllib  # type: ignore[import-not-found]

TYPE_CHECKING = False
if TYPE_CHECKING:
    from collections.abc import Callable, Iterator
    from typing import IO

    from click_extra import Context, Parameter

    from .package import Package


# Subcommand sections.
EXPLORE = Section("Explore subcommands")
MAINTENANCE = Section("Maintenance subcommands")
SNAPSHOTS = Section("Package snapshots subcommands")
SBOM_SECTION = Section("SBOM subcommands")


XKCD_MANAGER_ORDER = ("pip", "brew", "npm", "dnf", "apt", "steamcmd")
"""Sequence of package managers as defined by `XKCD #1654: Universal Install Script
<https://xkcd.com/1654/>`_.

See the corresponding :issue:`implementation rationale in issue #10 <10>`.
"""

COOLDOWN_SUPPORTED_MANAGERS = tuple(
    sorted(mid for mid, manager in pool.items() if manager.supports_cooldown)
)
"""IDs of the managers that natively enforce a release-age :option:`mpm --cooldown`.

Derived from the pool so the ``--cooldown`` help text never drifts from the set of
managers that actually carry a :py:attr:`cooldown_env_var
<meta_package_manager.execution.CLIExecutor.cooldown_env_var>`: adding cooldown
support to a manager surfaces it here automatically.
"""


[docs] def is_stdout(filepath: Path) -> bool: """Check if a file path is set to stdout. Prevents the creation of a ``-`` file in the current directory. """ return str(filepath) == "-"
[docs] def prep_path(filepath: Path) -> IO | None: """Prepare the output file parameter for Click's echo function.""" if is_stdout(filepath): return None return filepath.open("w", encoding="UTF-8")
[docs] def guard_existing_output(ctx: Context, output_path: Path, *, overwrite: bool) -> None: """Block clobbering an existing output file unless ``overwrite`` is set. Warns and exits with code 2 when ``output_path`` already exists and the user did not pass ``--overwrite``/``--force``/``--replace``. No-op when the file is absent. Callers handle the stdout case separately. """ if output_path.exists(): msg = "Target file exist and will be overwritten." if overwrite: logging.warning(msg) else: logging.critical(msg) ctx.exit(2)
[docs] def update_manager_selection( ctx: Context, param: Parameter, value: str | Iterable[str] | bool | None ) -> None: """Update global selection list of managers in the context. Accumulate and merge all manager selectors to form the initial population enforced by the user. """ # Option has not been called. if value is None: return # Use a list to keep the natural order of selection. to_add: list[str] = [] # Use a set because removal takes precedence over addition: we don't care # about user's order. to_remove: set[str] = set() assert param.name # Add the value of --manager list. if param.name == "manager": if value: assert isinstance(value, Iterable) to_add.extend(value) # Add the value of --exclude list. elif param.name == "exclude": if value: assert isinstance(value, Iterable) to_remove.update(value) # Update the list of managers with the XKCD preset. elif param.name == "xkcd": if value: to_add.extend(XKCD_MANAGER_ORDER) # Update selection with single selectors. else: # Because the parameter's name is transformed into a Python identifier on # instantiation, we have to reverse the process to get our value. # Example: --apt-mint => apt_mint => apt-mint manager_id = param.name.removeprefix("no_").replace("_", "-") assert manager_id in pool.all_manager_ids, ( f"unrecognized single manager selector {param.name!r}" ) # Normalize the value to a boolean. if isinstance(value, str): value = RawConfigParser.BOOLEAN_STATES.get(value.lower(), value) assert value in ( manager_id, True, False, ), f"unexpected value {value!r} for {param!r}" if param.name.startswith("no_") ^ (value is False): to_remove.add(manager_id) else: to_add.append(manager_id) logging.debug(f"Managers added by {param}: {to_add}") logging.debug(f"Managers removed by {param}: {to_remove}") # Initialize the shared context object to accumulate there the selection results. if ctx.obj is None: ctx.obj = {} if to_add: ctx.obj.setdefault("managers_to_add", []).extend(to_add) if to_remove: ctx.obj.setdefault("managers_to_remove", set()).update(to_remove)
[docs] def single_manager_selectors(): """Dynamiccaly creates a dedicated flag selector alias for each manager.""" single_flags = [] single_no_flags = [] for manager_id, manager in pool.items(): single_flags.append( option( f"--{manager_id}", flag_value=manager_id, default=None, help=f"Select {manager.name}.", deprecated=manager.deprecated, expose_value=False, callback=update_manager_selection, ) ) single_no_flags.append( option( f"--no-{manager_id}", flag_value=manager_id, default=None, help=f"Deselect {manager.name}.", deprecated=manager.deprecated, expose_value=False, callback=update_manager_selection, ) ) return *single_flags, *single_no_flags
[docs] def bar_plugin_path(ctx: Context, param: Parameter, value: str | None): """Print the location of the :doc:`Xbar/SwiftBar plugin <bar-plugin>`. Returns the normalized path of the standalone `bar_plugin.py <https://github.com/kdeldycke/meta-package-manager/blob/main/meta_package_manager/bar_plugin.py>`_ script that is distributed with this Python module. This is made available under the :option:`mpm --bar-plugin-path` option. Notice that the fully-qualified home directory get replaced by its shorthand (``~``) if applicable: - the full ``/home/user/.python/site-packages/mpm/bar_plugin.py`` path is simplified to ``~/.python/site-packages/mpm/bar_plugin.py``, - but ``/usr/bin/python3.10/mpm/bar_plugin.py`` is returned as-is. """ # Option has not been called. if not value: return # Options is only available when CLI is installed from sources, not CLI is # a bundled executable. if "__compiled__" in globals(): logging.debug("CLI running as a binary.") logging.critical( "Option --bar-plugin-path is only available for CLI installed from " "sources.", ) ctx.exit(2) bar_path = Path(bar_plugin.__file__).expanduser().resolve() home_dir = Path.home() if bar_path.is_relative_to(home_dir): home_shorthand = Path("~") shorten_bar_path = home_shorthand / bar_path.relative_to(home_dir) assert shorten_bar_path.expanduser().resolve() == bar_path bar_path = shorten_bar_path echo(bar_path) ctx.exit()
@group( # Verbosity stays at click-extra's WARNING default: the βœ“/βœ— trail and finisher # print via echo (not logging) and survive it, so a default run shows just those # plus real warnings and critical. Per-operation narration (priority, # announcements, skip reasons) sits at INFO, one --verbosity INFO away. config_schema=MpmConfig, config_validators=(build_manager_overrides_validator(pool),), version_fields={ "env_info": ( f"Python {platform.python_version()}, " f"{current_platform().name} {current_architecture().name}" ), }, ) @option_group( "Package manager selection", # ---------------------- 80 characters reference limit ----------------------- # dedent("""\ \b Use these options to restrict the subcommand to a subset of managers. \b - By default, mpm will evaluate all managers supported on the current platform. - Use the --<manager-id> selectors to restrict target to a subset of managers. - To remove a manager from the selection, use --no-<manager-id> selectors. - Order of the selectors is preserved for priority-sensitive subcommands. - Exclusion of a manager always takes precedence over its inclusion. \b """), *single_manager_selectors(), option( "-a", "--all-managers", is_flag=True, default=False, help="Force evaluation of all managers implemented by mpm, including those " "not supported by the current platform or deprecated. Still applies filtering " "by --<manager-id> / --no-<manager-id> options before calling the subcommand.", ), option( "-x", "--xkcd", is_flag=True, default=None, expose_value=False, callback=update_manager_selection, help="Preset manager selection as defined by XKCD #1654. Equivalent to: " "{}.".format(" ".join(f"--{mid}" for mid in XKCD_MANAGER_ORDER)), ), option( "-m", "--manager", type=Choice(pool.all_manager_ids, case_sensitive=False), multiple=True, default=None, expose_value=False, hidden=True, callback=update_manager_selection, deprecated="Use --<manager-id> single selector instead.", help="Select a manager.", ), option( "-e", "--exclude", type=Choice(pool.all_manager_ids, case_sensitive=False), multiple=True, default=None, expose_value=False, hidden=True, callback=update_manager_selection, deprecated="Use --no-<manager-id> single selector instead.", help="Exclude a manager.", ), ) @option_group( "Manager options", option( "--ignore-auto-updates/--include-auto-updates", default=True, help="Report all outdated packages, including those tagged as " "auto-updating. Only applies to outdated and upgrade subcommands.", ), option( "--stop-on-error/--continue-on-error", default=False, help="Stop right away or continue operations on manager CLI error.", ), option( "-d", "--dry-run", is_flag=True, default=False, help="Do not actually perform any action, just simulate CLI calls.", ), option( "-t", "--timeout", type=IntRange(min=0), default=None, help="Maximum duration in seconds for each CLI call. Applies to every " "manager and operation. When unset, a per-operation default is used " "instead: a short cap for read-only queries (installed, outdated, search) " "and a longer one for state-changing operations (install, upgrade, remove, " "sync, cleanup).", ), jobs_option( "-j", "--jobs", help="Maximum number of managers to run concurrently. Defaults to one " "less than the CPU count; set 1 to run sequentially. Applies to read-only " "queries (installed, outdated, search), maintenance commands (sync, " "cleanup, upgrade --all), and the state changers (install, remove, " "upgrade, restore), which fan out across managers while running each " "manager's own packages one at a time. Installing a package left untied " "to a manager stays sequential.", ), option( "--cooldown", type=Duration(), default="", metavar="DURATION", help="Refuse to install or upgrade any package version published more " "recently than this duration, as a mitigation against supply-chain " "attacks. Accepts a friendly duration ('7 days', '1 week', '12h'), an " "ISO 8601 duration ('P7D', 'PT12H'), or an RFC 3339 absolute timestamp " "('2024-05-01T00:00:00Z'). Only honored by managers with native " "release-age support (" + ", ".join(COOLDOWN_SUPPORTED_MANAGERS) + "); the " "others are skipped unless --allow-unsupported-managers is set.", ), option( "--require-cooldown-support/--allow-unsupported-managers", default=True, help="When --cooldown is set, whether to require each manager to natively " "enforce it. The default (--require-cooldown-support) skips managers that " "cannot, so nothing slips in unguarded (fail-closed). " "--allow-unsupported-managers runs install and upgrade on them anyway, " "trading the supply-chain safeguard for broader manager coverage.", ), ) @option_group( "Output options", option( "--description", is_flag=True, default=False, help="Show package description in results.", ), option( "-s", "--sort-by", type=EnumChoice(SortableField), multiple=True, default=(SortableField.MANAGER_ID,), help="Sort results by this field. Repeat to add tie-breakers in priority " "order, like '-s manager_id -s package_id'.", ), # option('--sort-asc/--sort-desc', default=True) option( "--summary/--no-summary", default=True, help=( "Print an end-of-run summary on stderr: a count line of " "per-manager totals plus any subcommand-specific follow-up " "notes (like SBOM enrichment and merge counts). Defaults on; " "use --no-summary to silence." ), ), option( "--suggest-contribs/--no-suggest-contribs", default=True, help="Print a contribution invitation when a user override targets a " "field that likely indicates an upstream detection bug " "(cli_names, cli_search_path, requirement, version_cli_options, " "version_regexes).", ), ) @option_group( "Xbar/SwiftBar options", option( "--bar-plugin-path", is_flag=True, default=False, expose_value=False, is_eager=True, callback=bar_plugin_path, help="Print location of the Xbar/SwiftBar plugin.", ), ) @pass_context def mpm( ctx, all_managers, ignore_auto_updates, stop_on_error, dry_run, timeout, cooldown, require_cooldown_support, description, sort_by, summary, suggest_contribs, ): """CLI options shared by all subcommands.""" # Silence all log messages for serialization rendering unless in debug mode. if ( ctx.meta[TABLE_FORMAT] in SERIALIZATION_FORMATS and ctx.meta[VERBOSITY_LEVEL] != LogLevel.DEBUG ): logging.disable() def remove_logging_override(): """Reset the logging override to its default state. ``logging.disable()`` mess with the logging module internals at the root level. We need to restore the default behavior when the context is closed, otherwise the logging module will be stuck in a disabled state. See: https://docs.python.org/3/library/logging.html?highlight=logging#logging.disable """ logging.disable(logging.NOTSET) ctx.call_on_close(remove_logging_override) # click-extra's default --progress/--no-progress option resolves the user's # intent (lowered by --accessible) into ctx.meta[PROGRESS], # decoupled from color. mpm layers on its own output-mode gating: no spinner in # serialized output or at DEBUG verbosity (where logs already narrate). The TTY # and TERM=dumb checks are left to the spinner widget (see _make_spinner). show_progress = ( ctx.meta[PROGRESS] and ctx.meta[TABLE_FORMAT] not in SERIALIZATION_FORMATS and ctx.meta[VERBOSITY_LEVEL] != LogLevel.DEBUG ) # Apply per-manager attribute overrides from [mpm.managers.<id>] sections of # the config file, before any subcommand observes the pool. Also collects any # contribution-hint candidates onto ctx.meta for the close-time callback below. apply_manager_overrides_from_context(ctx, pool) if suggest_contribs: ctx.call_on_close(partial(print_contribution_hints, ctx)) # Snapshot per-manager error counts so the close-time summary below only # reports errors that accumulated during *this* invocation. The pool is a # module-level singleton and survives across calls (e.g. in test runs that # exercise multiple invocations in the same process), so a naive non-empty # check would warn about errors from prior runs. initial_error_counts = {mid: len(m.cli_errors) for mid, m in pool.register.items()} def summarize_cli_errors(): """End-of-run hint when underlying CLIs reported errors. Captured stderr is logged at DEBUG (see :py:func:`CLIExecutor.run_cli`) so a default-verbosity run no longer floods the table with gem extension warnings, mas Spotlight chatter, etc. This summary preserves the "something went sideways" signal in one line, without replicating the noise. Skipped at DEBUG verbosity (the stderr already appeared inline) and in serialization formats (logging is disabled and ``cli_errors`` ships in the structured payload anyway). """ if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: return failed = sorted( mid for mid, manager in pool.register.items() if len(manager.cli_errors) > initial_error_counts.get(mid, 0) ) if not failed: return ids = ", ".join(map(theme().invoked_command, failed)) plural = "managers" if len(failed) > 1 else "manager" logging.warning( f"{len(failed)} {plural} reported errors during this run " f"({ids}); re-run with --verbosity DEBUG for details.", ) ctx.call_on_close(summarize_cli_errors) # Normalize to None if no manager selectors have been used. This prevent the # pool.select_managers() method to iterate over an empty population of managers to # choose from. user_selection = None managers_to_remove = None if ctx.obj: user_selection = ctx.obj.get("managers_to_add", None) managers_to_remove = ctx.obj.get("managers_to_remove", None) # Sentinel to print the selection summary on the first call only. selection_logged = False def selected_managers(**kwargs): """Select the subset of managers to target, and apply manager-level options. The selection summary is logged at ``DEBUG`` on the first call only. The ``βœ“``-trailed spinner from :py:func:`meta_package_manager.execution.collect_from_managers` already names every manager that ran, so this summary is redundant at default verbosity for read-only commands; it is kept for troubleshooting, where it also surfaces config-driven drops that never appear in the trail. Logging on the first call only keeps subcommands that never resolve the pool (like ``--help``) silent. Callers may pass ``keep=<ids>`` to narrow the selection to a specific subset (for example, the managers that implement a given operation). When provided it overrides the global ``user_selection`` for that call. """ nonlocal selection_logged if not selection_logged: if user_selection: selected = " > ".join(map(theme().invoked_command, user_selection)) logging.info(f"Selected managers (by priority): {selected}.") else: logging.info("Selected managers: platform defaults.") if managers_to_remove: dropped = ", ".join( map(theme().invoked_command, sorted(managers_to_remove)) ) logging.info(f"Dropped managers: {dropped}.") else: logging.info("Dropped managers: none.") selection_logged = True keep = kwargs.pop("keep", user_selection) return pool.select_managers( keep=keep, drop=managers_to_remove, keep_deprecated=all_managers, # Should we include auto-update packages or not? ignore_auto_updates=ignore_auto_updates, # Does the manager should raise on error or not. stop_on_error=stop_on_error, dry_run=dry_run, timeout=timeout, progress=show_progress, # Minimum release age gate and its fail-open escape hatch. cooldown=cooldown, require_cooldown_support=require_cooldown_support, **kwargs, ) # Load up current and new global options to the context for subcommand consumption. ctx.obj = namedtuple( "GlobalOptions", ( "all_managers", "user_selection", "user_drops", "selected_managers", "description", "sort_by", "summary", ), defaults=( all_managers, user_selection, managers_to_remove, selected_managers, description, sort_by, summary, ), )() # Override ctx.print_table with mpm's field-aware sorted variant, so # subcommands keep their (headers, table, sort_by) call contract. ctx.print_table = partial( print_sorted_table, table_format=ctx.meta[TABLE_FORMAT], ) # Extend --version output with Python and platform metadata. version_option = search_params(mpm.params, VersionOption) if isinstance(version_option, VersionOption): version_option.message = "{prog_name}, version {version}\n{env_info}" # Highlight placeholder option names that appear in the help text prose. mpm.extra_keywords = HelpKeywords( # type: ignore[attr-defined] long_options={"--<manager-id>", "--no-<manager-id>"}, ) # "version" is a --sort-by choice but too common a word in help text. mpm.excluded_keywords = HelpKeywords(choices={"version"}) # type: ignore[attr-defined] @mpm.command( short_help="List every registered package manager and check its presence " "on the system.", section=EXPLORE, ) @pass_context def managers(ctx): """List every package manager detected on the system. Only reports by default all managers supported on the current platform. To include unsupported and deprecated managers in the report, use the :option:`--all-managers` flag. User's own selection configuration are intentionally ignored, so a manager dropped from regular operations is still visible here for troubleshooting. To narrow down the report to a subset of managers, pass the same selectors as for other subcommands (e.g. :option:`--pip` or :option:`--no-apt`). """ if ctx.obj.user_drops: dropped = ", ".join(map(theme().invoked_command, sorted(ctx.obj.user_drops))) logging.info(f"Ignoring user exclusion of {dropped}.") inventory = partial( pool.select_managers, keep=ctx.obj.user_selection, drop=None, keep_deprecated=ctx.obj.all_managers, # Keep managers whose CLI was not found, to show how mpm reacts to the # local platform. drop_not_found=False, ) # Machine-friendly data rendering. table_format = ctx.meta[TABLE_FORMAT] if table_format in SERIALIZATION_FORMATS: manager_data = {} # Build up the data structure of manager metadata. fields = ( "name", "id", "supported", "cli_path", "executable", "version", "fresh", "available", ) for manager in inventory(): manager_data[manager.id] = {fid: getattr(manager, fid) for fid in fields} # Serialize errors at the last minute to gather all we encountered. manager_data[manager.id]["errors"] = list( {expt.error for expt in manager.cli_errors}, ) print_serialized_and_exit(ctx, manager_data) # Human-friendly content rendering. table = [] for manager in inventory(): # Build up the OS column content. os_infos = ( theme().success(OK_GLYPH) if manager.supported else theme().error(KO_GLYPH) ) if not manager.supported: os_infos += " {}".format( ", ".join( sorted(p.name for p in reduce(manager.platforms, MAIN_PLATFORMS)) ), ) if manager.deprecated: os_infos += f" {theme().warning('(deprecated)')}" # Build up the CLI path column content. cli_infos = "{} {}".format( theme().success(OK_GLYPH) if manager.cli_path else theme().error(KO_GLYPH), highlight_cli_name(manager.cli_path, manager.cli_names) if manager.cli_path else ( f"{', '.join(map(theme().invoked_command, manager.cli_names))} not found" ), ) # Build up the version column content. version_infos = "" if manager.executable: version_infos = ( theme().success(OK_GLYPH) if manager.fresh else theme().error(KO_GLYPH) ) if manager.version: version_infos += f" {manager.version}" if not manager.fresh: version_infos += f" {manager.requirement}" table.append( ( getattr(theme(), "success" if manager.fresh else "error")(manager.id), manager.name, os_infos, cli_infos, theme().success(OK_GLYPH) if manager.executable else "", version_infos, ), ) ctx.find_root().print_table( ( ("Manager ID", SortableField.MANAGER_ID), ("Name", SortableField.MANAGER_NAME), ("Supported", None), ("CLI", None), ("Executable", None), ("Version", SortableField.VERSION), ), table, ctx.obj.sort_by, ) def _manager_result( manager: PackageManager, packages: tuple[dict, ...] ) -> tuple[str, dict]: """Build the standard ``(id, payload)`` result for a read-only manager query. The payload shape β€” ``id``, ``name``, ``packages``, ``errors`` β€” is shared by the ``installed``, ``outdated`` and ``search`` subcommands, their serialized output and their table rendering. ``errors`` is collected at the last minute so it gathers every distinct CLI error the manager accumulated during the query; a non-empty list also marks the manager's ``βœ—`` in the concurrent spinner trail (see :py:func:`meta_package_manager.execution.collect_from_managers`). """ return manager.id, { "id": manager.id, "name": manager.name, "packages": packages, # Serialize errors at the last minute to gather all we encountered. "errors": list({expt.error for expt in manager.cli_errors}), } def _safe_packages( manager: PackageManager, source: Callable[[], Iterable[Package]], fields: tuple[str, ...], action: str, ) -> tuple[dict, ...]: """Materialize ``source()`` into package dicts, tolerating a CLI failure. On :py:class:`meta_package_manager.execution.CLIError` (the manager's query subprocess failed), log a one-line ``"Could not {action} from {manager}"`` warning and return no packages, so one broken manager never aborts the batch. """ try: return tuple(packages_asdict(source(), fields)) except CLIError: logging.warning( f"Could not {action} from {theme().invoked_command(manager.id)}." ) return () def _filter_matches( packages: Iterable[Package], query: str | None, *, exact: bool, ) -> Iterator[Package]: """Yield only the packages matching ``query`` on their ID or name. A transparent pass-through when ``query`` is ``None`` (no positional query was given). Shared by the ``installed`` and ``outdated`` subcommands to post-filter the fully-materialized package list each manager returns: unlike ``search``, these operations already hold the complete inventory, so the query is a local refinement rather than a manager-side lookup. Mirrors the fuzzy/``--exact`` semantics of ``search`` through :py:meth:`meta_package_manager.package.Package.matches`. """ for package in packages: if query is None or package.matches(query, exact=exact): yield package def _query_highlighter(query: str | None) -> Callable[[str], str]: """Build a highlighter that emphasizes ``query`` matches in table cells. Returns a cached, case-insensitive callable that wraps each occurrence of the query (and its alphanumeric parts) in the active theme's ``search`` style, so the matched substring stands out in the rendered table. When no query was given, returns an identity function instead, leaving cells untouched. Shared by the ``search``, ``installed`` and ``outdated`` renderers. """ if not query: return lambda value: value patterns = {query}.union(PackageManager.query_parts(query)) highlighter: Callable[[str], str] = cached(LRI(max_size=1000))( partial( highlight, patterns=patterns, styling_func=theme().search, ignore_case=True, ), ) return highlighter @mpm.command(aliases=["list"], short_help="List installed packages.", section=EXPLORE) @option( "--exact/--fuzzy", default=False, help="With a QUERY, only keep packages whose ID or name matches it exactly, " "instead of the default case-insensitive, tokenized (fuzzy) match. No effect " "without a QUERY.", ) @option( "-d", "--duplicates", is_flag=True, default=False, help="Only list installed packages sharing the same ID. Implies " "`--sort-by package_id` to make duplicates easier to compare between themselves.", ) @argument("query", type=STRING, required=False) @pass_context def installed(ctx, exact, duplicates, query): """List all packages installed on the system by each manager. With an optional ``QUERY``, restrict the listing to installed packages whose ID or name matches it. The match is fuzzy by default (case-insensitive, tokenized); ``--exact`` requires a verbatim match on the package ID or name. """ # Build-up a global dict of installed packages per manager. fields = ( "id", "name", "installed_version", ) managers = list( ctx.obj.selected_managers(implements_operation=Operations.installed), ) def fetch(manager: PackageManager) -> tuple[str, dict]: # installed_or_empty() never raises (it swallows errors into an empty # result), so no CLIError guard is needed here. packages = tuple( packages_asdict( _filter_matches(manager.installed_or_empty(), query, exact=exact), fields, ) ) return _manager_result(manager, packages) installed_data = { manager_id: data for manager_id, data in collect_from_managers( "Listing", "Listed", managers, fetch ) } # Filters out non-duplicate packages. if duplicates: # Re-group packages by their IDs. package_sources: dict[str, set[str]] = {} for manager_id, installed_pkg in installed_data.items(): for package in installed_pkg["packages"]: package_sources.setdefault(package["id"], set()).add(manager_id) logging.debug(f"Managers sourcing each package: {package_sources}") # Identify package IDs shared by multiple managers. duplicates_ids = { pid for pid, managers in package_sources.items() if len(managers) > 1 } logging.debug(f"Duplicates: {duplicates_ids}") # Remove non-duplicates from results. for manager_id, manager_data in installed_data.items(): duplicate_packages = tuple( p for p in manager_data["packages"] if p["id"] in duplicates_ids ) manager_data["packages"] = duplicate_packages # Machine-friendly data rendering. print_serialized_and_exit(ctx, installed_data) # Human-friendly content rendering, highlighting the query matches (if any). highlight_query = _query_highlighter(query) table = [] for manager_id, installed_pkg in installed_data.items(): table += [ ( highlight_query(info["id"]) if info["id"] else "", highlight_query(info["name"]) if info["name"] else "", manager_id, str(info["installed_version"]) if info["installed_version"] else "?", ) for info in installed_pkg["packages"] ] # Force sorting by package ID in duplicate mode. sort_by = ctx.obj.sort_by if duplicates: logging.info( "Force table sorting on package ID because of --duplicates option." ) sort_by = (SortableField.PACKAGE_ID,) # Print table. ctx.find_root().print_table( ( ("Package ID", SortableField.PACKAGE_ID), ("Name", SortableField.PACKAGE_NAME), ("Manager", SortableField.MANAGER_ID), ("Installed version", SortableField.VERSION), ), table, sort_by, ) if ctx.obj.summary: print_summary(package_counts(installed_data)) @mpm.command(short_help="List outdated packages.", section=EXPLORE) @option( "--exact/--fuzzy", default=False, help="With a QUERY, only keep packages whose ID or name matches it exactly, " "instead of the default case-insensitive, tokenized (fuzzy) match. No effect " "without a QUERY.", ) @option( "--plugin-output", is_flag=True, default=False, help="Output results for direct consumption by an Xbar/SwiftBar-compatible plugin. " "The layout is dynamic and depends on environment variables set by either Xbar " "or SwiftBar.", ) @argument("query", type=STRING, required=False) @pass_context def outdated(ctx, exact, plugin_output, query): """List available package upgrades and their versions for each manager. With an optional ``QUERY``, restrict the listing to outdated packages whose ID or name matches it. The match is fuzzy by default (case-insensitive, tokenized); ``--exact`` requires a verbatim match on the package ID or name. """ # Build-up a global list of outdated packages per manager. fields = ( "id", "name", "installed_version", "latest_version", ) managers = list( ctx.obj.selected_managers(implements_operation=Operations.outdated), ) def fetch(manager: PackageManager) -> tuple[str, dict]: packages = _safe_packages( manager, lambda: _filter_matches(manager.refiltered_outdated, query, exact=exact), fields, "list outdated packages", ) return _manager_result(manager, packages) outdated_data = { manager_id: data for manager_id, data in collect_from_managers( "Checking", "Checked", managers, fetch ) } # Machine-friendly data rendering. print_serialized_and_exit(ctx, outdated_data) # Xbar/SwiftBar-friendly plugin rendering. if plugin_output: BarPluginRenderer().print(outdated_data) ctx.exit() # Human-friendly content rendering, highlighting the query matches (if any). highlight_query = _query_highlighter(query) table = [] for manager_id, outdated_pkg in outdated_data.items(): for info in outdated_pkg["packages"]: installed_version, latest_version = diff_versions( info["installed_version"] if info["installed_version"] else "?", info["latest_version"], ) table.append( ( highlight_query(info["id"]) if info["id"] else "", highlight_query(info["name"]) if info["name"] else "", manager_id, installed_version, latest_version, ), ) # Sort and print table. ctx.find_root().print_table( ( ("Package ID", SortableField.PACKAGE_ID), ("Name", SortableField.PACKAGE_NAME), ("Manager", SortableField.MANAGER_ID), ("Installed version", SortableField.VERSION), ("Latest version", None), ), table, ctx.obj.sort_by, ) if ctx.obj.summary: print_summary(package_counts(outdated_data)) # TODO: make it a --search-strategy=[exact, fuzzy, extended] # Add details helps => exact: is case-sensitive, and keep all non-alnum chars # fuzzy: query is case-insensitive, stripped-out of non-alnum chars and # tokenized (no order sensitive) # extended, same as fuzzy, but do not limit search to package ID and name. # extended to description and other metadata depending on manager support. # Modes: # 1. strict (--exact, on ID or name) # 2. substring (regex, no case, no split) # 3. fuzzy (token-based) # 4. extended (fuzzy + metadata) @mpm.command(short_help="Search packages.", section=EXPLORE) @option( "--extended/--id-name-only", default=False, help="Extend search to description, instead of restricting it to package ID and " "name. Implies --description.", ) @option( "--exact/--fuzzy", default=False, help="Only returns exact matches on package ID or name.", ) @option( "--refilter/--no-refilter", default=True, help="Let mpm refilters managers' search results.", ) @argument("query", type=STRING, required=True) @pass_context def search(ctx, extended, exact, refilter, query): """Search each manager for a package ID, name or description matching the query.""" # --extended implies --description. show_description = ctx.obj.description if extended and not ctx.obj.description: logging.info("--extended option forces --description option.") show_description = True # Build-up a global list of package matches per manager. fields = ( "id", "name", "latest_version", "description", ) search_method = "refiltered_search" if refilter else "search" managers = list( ctx.obj.selected_managers(implements_operation=Operations.search), ) def fetch(manager: PackageManager) -> tuple[str, dict]: packages = _safe_packages( manager, lambda: getattr(manager, search_method)(query, extended, exact), fields, "search packages", ) return _manager_result(manager, packages) matches = { manager_id: data for manager_id, data in collect_from_managers( "Searching", "Searched", managers, fetch ) } # Machine-friendly data rendering. print_serialized_and_exit(ctx, matches) # Human-friendly content rendering, highlighting the query matches. highlight_query = _query_highlighter(query) table = [] for manager_id, matching_pkg in matches.items(): for pkg in matching_pkg["packages"]: line = [ highlight_query(pkg["id"]) if pkg["id"] else "", highlight_query(pkg["name"]) if pkg["name"] else "", manager_id, str(pkg["latest_version"]) if pkg["latest_version"] else "?", ] if show_description: line.append( highlight_query(pkg.get("description")) if pkg.get("description") else "", ) table.append(line) # Sort and print table. headers: list[tuple[str, str | None]] = [ ("Package ID", SortableField.PACKAGE_ID), ("Name", SortableField.PACKAGE_NAME), ("Manager", SortableField.MANAGER_ID), ("Latest version", SortableField.VERSION), ] if show_description: headers.append(("Description", None)) ctx.find_root().print_table(headers, table, ctx.obj.sort_by) if ctx.obj.summary: print_summary(package_counts(matches)) @mpm.command(aliases=["locate"], short_help="Locate CLIs on system.", section=EXPLORE) @argument("cli_names", type=STRING, nargs=-1, required=True) @pass_context def which(ctx, cli_names): """Search from the user's environment all CLIs matching the query. This is mpm's own version of the `which -a` UNIX command, used internally to locate binaries for each manager. It is exposed as a subcommand for convenience and to help troubleshoot CLI resolution logic. Compared to the venerable `which` command, this will respect the additional path configured for each package manager. It will ignore files that are empty (0 size). On Windows, it additionally suppress the default lookup in the current directory, which takes precedence on other paths. """ if ctx.obj.sort_by: logging.info("Ignore --sort-by option for which command.") # Machine-friendly data rendering. table_format = ctx.meta[TABLE_FORMAT] if table_format in SERIALIZATION_FORMATS: cli_data = [ { "manager_id": manager.id, "cli_paths": list(manager.search_all_cli(cli_names)), } for manager in ctx.obj.selected_managers() ] print_serialized_and_exit(ctx, cli_data) # Print table. table = [] for manager in ctx.obj.selected_managers(): for priority, found_cli in enumerate(manager.search_all_cli(cli_names)): # Resolve symlinks and highlight the CLI name. symlink = "" if found_cli.is_symlink(): # resolve() always returns a Path, so highlight_cli_name won't return None. resolved = highlight_cli_name(found_cli.resolve(), cli_names) assert resolved is not None symlink = f"β†’ {resolved}" table.append( ( manager.id, str(priority), highlight_cli_name(found_cli, cli_names), symlink, ), ) ctx.find_root().print_table( ( ("Manager ID", SortableField.MANAGER_ID), ("Priority", None), ("CLI path", None), ("Symlink destination", None), ), table, ctx.obj.sort_by, ) @mpm.command( name="config-template", short_help="Print per-manager overrides as a TOML config template.", section=EXPLORE, ) @argument( "manager_ids", type=Choice(pool.all_manager_ids, case_sensitive=False), nargs=-1, ) @pass_context def config_template(ctx, manager_ids): """Print the overridable attributes of one or more managers as a TOML config template. Each block is a valid ``[mpm.managers.<id>]`` section ready to paste into a standalone config file or a ``[tool.mpm]`` ``pyproject.toml`` block. The output lists every overridable field with its current value so it doubles as the canonical reference for what each manager exposes: prune the rows that don't apply and customize the rest. With no positional arguments, every maintained (non-deprecated) manager is dumped. Pass one or more manager IDs to restrict the output. """ target_ids = manager_ids or pool.maintained_manager_ids overrides = {mid: dump_manager_overrides(pool[mid]) for mid in target_ids} echo(tomli_w.dumps({"mpm": {"managers": overrides}}), nl=False)
[docs] def cooldown_permits(manager: PackageManager) -> bool: """Decide whether a release-introducing operation may run on ``manager``. Returns ``True`` when no cooldown is active, when the manager can enforce it natively, or when the user opted out of the requirement with ``--allow-unsupported-managers``. Returns ``False`` (after logging the skip) when an active cooldown cannot be enforced and the requirement still holds, so the caller leaves the manager alone rather than letting a freshly-published version slip in. """ if manager.cooldown is None or manager.supports_cooldown: return True if not manager.require_cooldown_support: logging.warning( f"{theme().invoked_command(manager.id)} cannot enforce the release-age " "cooldown; running it without the supply-chain safeguard.", ) return True logging.warning( f"Skip {theme().invoked_command(manager.id)}: it cannot enforce the " "release-age cooldown. Use --allow-unsupported-managers to run it anyway.", ) return False
[docs] def package_label(spec: Specifier) -> str: """Render a spec as ``package_id`` or ``package_id@version`` for trail output.""" if spec.version: return f"{spec.package_id}{VERSION_SEP}{spec.version}" return spec.package_id
def _package_task( manager: PackageManager, spec: Specifier, lock: threading.Lock, *, action: Callable[[PackageManager, Specifier], str | None], verb: str, past: str, prep: str, record_failure: Callable[[Specifier], None], ) -> Callable[[], tuple[bool, str]]: """Build one per-package task for :func:`collect_per_package`. Runs ``action(manager, spec)`` with the manager forced to raise on failure (so a botched operation is recorded, not swallowed), narrates the outcome at ``INFO``, and returns ``(ok, message)`` for the ``βœ“``/``βœ—`` trail. On failure it appends the spec to a caller-owned list through ``record_failure`` (under ``lock``, since the list is shared across the concurrent lanes) and reports ``βœ—``. Shared by ``install``, ``remove``, ``upgrade <packages>`` and ``restore``, whose tasks differ only in the action, the verb forms, and which failure list they feed. :param action: performs the manager operation, returning its CLI output (or ``None``). :param verb: present-tense operation name ("install"), for the ``INFO`` lines and the "failed to {verb}" trail. :param past: past participle ("installed"), for the success trail. :param prep: preposition joining the package and the manager ("with", "from"). :param record_failure: appends the failed spec (or its id) to a caller-owned list. """ mgr = theme().invoked_command(manager.id) def task() -> tuple[bool, str]: with patch.object(manager, "stop_on_error", True): try: output = action(manager, spec) except NotImplementedError: logging.info(f"{mgr} does not implement {verb} operation.") except CLIError: logging.info(f"Could not {verb} {package_label(spec)} with {mgr}.") else: if output: logging.info(output) return True, f"{package_label(spec)} {past} {prep} {mgr}" with lock: record_failure(spec) return False, f"{package_label(spec)} failed to {verb} {prep} {mgr}" return task def _maintenance_work( announce: int, message: str, operation: Callable[[PackageManager], object], ) -> Callable[[PackageManager], tuple[str, dict]]: """Build a ``work`` callable for a maintenance command's fan-out. Logs ``message`` (with the themed manager name substituted for ``{}``) at the ``announce`` level, runs ``operation(manager)``, and returns ``(id, {"errors": <CLI errors raised during the run>})`` so a manager that grows its error list is marked ``βœ—`` in the trail. Shared by ``sync`` and ``cleanup``, whose work differs only in the message and the manager method. """ def work(manager: PackageManager) -> tuple[str, dict]: logging.log(announce, message.format(theme().invoked_command(manager.id))) before = len(manager.cli_errors) operation(manager) return manager.id, {"errors": manager.cli_errors[before:]} return work def _dispatch_sourced_operation( ctx: Context, packages_specs: tuple[str, ...], *, operation: Operations, action: Callable[[PackageManager, Specifier], str | None], verb: str, past: str, prep: str, label: str, done_label: str, apply_cooldown: bool = False, ) -> None: """Resolve each package spec to its source managers, then fan ``action`` out. The shared engine behind ``upgrade <packages>`` and ``remove``. Both resolve every spec to the managers that can act on it β€” the manager named in the spec, or every selected manager that reports the package installed β€” then run ``action`` per (package, manager) concurrently across managers and serially within each (see :func:`meta_package_manager.execution.collect_per_package`). A package no manager recognizes is skipped with an error; any genuine failure exits non-zero with a ``critical`` summary, matching ``install``. ``apply_cooldown`` gates each manager through :func:`cooldown_permits` first, so a release-introducing ``upgrade`` skips a manager that cannot honor an active cooldown; ``remove`` (which introduces nothing) leaves it ``False``. """ selected_managers = tuple( ctx.obj.selected_managers(implements_operation=operation), ) manager_ids = tuple(manager.id for manager in selected_managers) # Subset of selected managers implementing `installed`, queried to discover which # manager(s) a spec untied to one was installed with. sourcing_managers = tuple( ctx.obj.selected_managers( keep=manager_ids, implements_operation=Operations.installed, ), ) # Collect every (package, manager) attempt that genuinely failed, to exit non-zero. failures: list[str] = [] # Group every (package, manager) pair by manager: managers run in parallel while each # manager's own packages are processed one at a time (see collect_per_package). failures_lock = threading.Lock() tasks: list[tuple[PackageManager, Callable[[], tuple[bool, str]]]] = [] solver = Solver(packages_specs, manager_priority=manager_ids) for package_id, spec in solver.resolve_package_specs(): source_manager_ids = set() # Use the manager from the spec. if spec.manager_id: source_manager_ids.add(spec.manager_id) # Package is not bound to a manager by the user's specifiers. else: logging.info( f"{spec} not tied to a manager. Search all managers recognizing it.", ) # Find all the managers that have the package installed. for manager in sourcing_managers: if package_id in manager.installed_ids: logging.info( f"{package_id} has been installed " f"with {theme().invoked_command(manager.id)}.", ) source_manager_ids.add(manager.id) if not source_manager_ids: logging.error( f"{package_id} is not recognized by any of the selected manager. " "Skip it.", ) continue # Announce the managers we will act with (also the non-TTY signal). logging.info( f"{verb.capitalize()} {package_id} " f"with {', '.join(map(theme().invoked_command, sorted(source_manager_ids)))}", ) # One task per (package, manager); a package acted on by two managers tallies as # two. For upgrade, skip a manager that cannot honor an active cooldown. for manager_id in sorted(source_manager_ids): manager = pool.get(manager_id) if apply_cooldown and not cooldown_permits(manager): continue tasks.append(( manager, _package_task( manager, spec, failures_lock, action=action, verb=verb, past=past, prep=prep, record_failure=lambda s: failures.append(s.package_id), ), )) collect_per_package(label, done_label, tasks) if failures: logging.critical(f"Could not {verb}: " + ", ".join(sorted(set(failures))) + ".") ctx.exit(1) def _attempt_install(manager: PackageManager, spec: Specifier) -> bool: """Try installing one ``spec`` with one ``manager``, returning success. Forces the manager to raise on failure (so a botched install is recorded by the caller rather than silently swallowed), narrates the per-attempt reason at ``INFO``, and logs the CLI output on success. Returns ``True`` only when the install ran cleanly. The caller maps the result onto its ``βœ“``/``βœ—`` ledger and decides the retry/stop semantics (the tied loop records every miss; the untied priority search falls through to the next manager). Shared by both sequential install paths. """ with patch.object(manager, "stop_on_error", True): try: output = manager.install(spec.package_id, version=spec.version) except NotImplementedError: logging.info( f"{theme().invoked_command(manager.id)} " "does not implement install operation.", ) return False except CLIError: logging.info( f"Could not install {spec} " f"with {theme().invoked_command(manager.id)}.", ) return False if output: logging.info(output) return True @mpm.command(short_help="Install a package.", section=MAINTENANCE) @argument( "packages_specs", type=STRING, nargs=-1, required=True, help="A mix of plain <package_id>, simple <package_id@version> specifiers or full " "<pkg:npm/left-pad> purls.", ) # TODO: add a --force/--reinstall flag @pass_context def install(ctx, packages_specs): """Install one or more packages. This subcommand is sensible to the order of the package managers selected by the user. Installation will first proceed for all the packages found to be tied to a specific manager. Which is the case for packages provided with precise package specifiers (like purl). This will also happens in situations in which a tighter selection of managers is provided by the user. For packages whose manager is not known, or if multiple managers are candidates for the installation, mpm will try to find the best manager to install it with. Installation will be attempted with each manager, in the order they were selected. If a search for the package ID returns no result from the highest-priority manager, we will skip the installation and try the next available managers in the order of their priority. """ # Cast generator to tuple because of reuse. selected_managers = tuple( ctx.obj.selected_managers(implements_operation=Operations.install), ) manager_ids = tuple(manager.id for manager in selected_managers) logging.info( "Installation priority: > " f"{' > '.join(map(theme().invoked_command, manager_ids))}", ) solver = Solver(packages_specs, manager_priority=manager_ids) packages_per_managers = solver.resolve_specs_group_by_managers() unmatched_packages = packages_per_managers.get(None, set()) # Collect every requested spec that no manager could install, to raise a non-zero # exit code at the end of the command. unresolved_specs: list[Specifier] = [] # Packages tied to a manager (purls, or a single-manager selection) install # concurrently across managers, serial within each (see collect_per_package). An # untied package needs a priority search (install with the first manager that has # it, skip the rest), which is cross-manager-sequential; its presence drops the # whole command onto the sequential path below. if not unmatched_packages: failures_lock = threading.Lock() def make_cooldown_task(spec, mgr): # cooldown_permits() already logged why; a skip is βœ— but not unresolved, so # it never forces a non-zero exit. def task() -> tuple[bool, str]: return False, f"{package_label(spec)} skipped in {mgr} (cooldown)" return task tasks: list[tuple[PackageManager, Callable[[], tuple[bool, str]]]] = [] for manager_id, package_specs in packages_per_managers.items(): if not manager_id: continue manager = pool.get(manager_id) mgr = theme().invoked_command(manager_id) permitted = cooldown_permits(manager) for spec in package_specs: if permitted: task = _package_task( manager, spec, failures_lock, action=lambda m, s: m.install(s.package_id, version=s.version), verb="install", past="installed", prep="with", record_failure=unresolved_specs.append, ) else: task = make_cooldown_task(spec, mgr) tasks.append((manager, task)) collect_per_package("Installing", "Installed", tasks) if unresolved_specs: logging.critical( "Could not install: " + ", ".join(sorted(str(spec) for spec in unresolved_specs)) + ".", ) ctx.exit(1) return # Untied packages present: the priority search cannot fan out, so run sequentially # (see warn_jobs_ignored). warn_jobs_ignored(ctx) # Leave a per-package βœ“/βœ— ledger plus a persistent finisher (see OperationTrail), # keyed by package and its resolving manager. total = sum(len(specs) for specs in packages_per_managers.values()) op = OperationTrail(selected_managers) installed_count = 0 def trail(spec: Specifier, manager_id: str, status: str) -> None: """Map an install attempt to a ``βœ“``/``βœ—`` ledger line through ``op``. ``status`` is ``installed`` (βœ“), or ``not_found`` / ``failed`` / ``cooldown`` (βœ—). """ mgr = theme().invoked_command(manager_id) reason = { "installed": f"installed with {mgr}", "not_found": f"not found in {mgr}", "failed": f"failed to install with {mgr}", "cooldown": f"skipped in {mgr} (cooldown)", }[status] op.mark(status == "installed", f"{package_label(spec)} {reason}") # Install all packages deterministically tied to a specific manager. for manager_id, package_specs in packages_per_managers.items(): if not manager_id: continue manager = pool.get(manager_id) if not cooldown_permits(manager): # cooldown_permits() already logged why; mark the tied packages dropped. for spec in package_specs: trail(spec, manager_id, "cooldown") continue for spec in package_specs: # A tied package has exactly one candidate manager, so a miss is final: # record it as unresolved (forcing a non-zero exit) and mark the βœ— trail. if _attempt_install(manager, spec): installed_count += 1 trail(spec, manager_id, "installed") else: unresolved_specs.append(spec) trail(spec, manager_id, "failed") # Drop managers that cannot honor an active cooldown (once, not per package). eligible_managers = tuple(m for m in selected_managers if cooldown_permits(m)) for spec in unmatched_packages: installed = False for manager in eligible_managers: # Is the package available on this manager? The per-attempt reason is INFO # narration; the βœ— trail line below names the manager that missed. matches = None try: matches = tuple( manager.refiltered_search( extended=False, exact=True, query=spec.package_id, ), ) except NotImplementedError: logging.info( f"{theme().invoked_command(manager.id)} " "does not implement search operation.", ) logging.info( f"{spec.package_id} existence unconfirmed, " "try to directly install it...", ) except CLIError: logging.info( f"Could not search for {spec.package_id} " f"with {theme().invoked_command(manager.id)}.", ) trail(spec, manager.id, "not_found") continue else: if not matches: logging.info( f"No {spec.package_id} package found " f"on {theme().invoked_command(manager.id)}.", ) trail(spec, manager.id, "not_found") continue # Prevents any incomplete or bad implementation of exact search. if len(matches) != 1: msg = "Exact search returned multiple packages." raise ValueError(msg) # On a failed install, fall through to the next manager in priority order. if not _attempt_install(manager, spec): trail(spec, manager.id, "failed") continue # Stop at the first (highest-priority) manager that provides the package. installed = True installed_count += 1 trail(spec, manager.id, "installed") break if not installed: unresolved_specs.append(spec) op.finish(installed_count == total, f"Installed {installed_count}/{total} packages") # Fail with a non-zero exit code if any requested package went uninstalled by every # selected manager. if unresolved_specs: logging.critical( "Could not install: " + ", ".join(sorted(str(spec) for spec in unresolved_specs)) + ".", ) ctx.exit(1) @mpm.command(aliases=["update"], short_help="Upgrade packages.", section=MAINTENANCE) @option( "-A", "--all", is_flag=True, default=False, help="Upgrade all outdated packages. " "Will make the command ignore package IDs provided as parameters.", ) @argument( "packages_specs", type=STRING, nargs=-1, help="A mix of plain <package_id>, simple <package_id@version> specifiers or full " "<pkg:npm/left-pad> purls.", ) @pass_context def upgrade(ctx, all, packages_specs): """Upgrade one or more outdated packages. All outdated package will be upgraded by default if no specifiers are provided as arguments. I.e. assumes -A/--all option if no [PACKAGES_SPECS].... Packages recognized by multiple managers will be upgraded with each of them. You can fine-tune this behavior with more precise package specifiers (like purl) and/or tighter selection of managers. Packages unrecognized by any selected manager will be skipped. """ if not all and not packages_specs: logging.info("No package provided, assume -A/--all option.") all = True # Full upgrade: one βœ“/βœ— ledger line per manager plus a finisher (see # OperationTrail). A manager fails its line if it grows cli_errors while running. if all: if packages_specs: # Deduplicate and sort specifiers for terseness. logging.info( f"Ignore {', '.join(sorted(set(packages_specs)))} specifiers " "and proceed to a full upgrade...", ) managers = list( ctx.obj.selected_managers(implements_operation=Operations.upgrade_all), ) # Explicit --<id> picks announce loudly; an implicit "upgrade everything" stays # at DEBUG so the default run shows only the trail (matching the explicit / # implicit levels select_managers already uses for its skip messages). announce = logging.INFO if ctx.obj.user_selection else logging.DEBUG def upgrade_all_work(manager: PackageManager) -> tuple[str, dict]: mgr = theme().invoked_command(manager.id) # cooldown_permits() already logs the reason at WARNING when it blocks; # mark the manager βœ— without running its CLI. if not cooldown_permits(manager): return manager.id, { "failed": True, "label": f"{mgr} skipped (cooldown)", } logging.log(announce, f"Upgrade all outdated packages from {mgr}...") before = len(manager.cli_errors) output = manager.upgrade() if output: logging.info(output) return manager.id, {"errors": manager.cli_errors[before:]} # Full upgrade is independent per manager, so fan out concurrently with a # βœ“/βœ— trail and a success-count finisher (see collect_from_managers). collect_from_managers( "Upgrading", "Upgraded", managers, upgrade_all_work, report_state=True ) ctx.exit() _dispatch_sourced_operation( ctx, packages_specs, operation=Operations.upgrade, action=lambda m, s: m.upgrade(s.package_id, version=s.version), verb="upgrade", past="upgraded", prep="with", label="Upgrading", done_label="Upgraded", apply_cooldown=True, ) @mpm.command(aliases=["uninstall"], short_help="Remove a package.", section=MAINTENANCE) @argument( "packages_specs", type=STRING, nargs=-1, required=True, help="A mix of plain <package_id>, simple <package_id@version> specifiers or full " "<pkg:npm/left-pad> purls.", ) @pass_context def remove(ctx, packages_specs): """Remove one or more packages. Packages recognized by multiple managers will be remove with each of them. You can fine-tune this behavior with more precise package specifiers (like purl) and/or tighter selection of managers. Packages unrecognized by any selected manager will be skipped. """ _dispatch_sourced_operation( ctx, packages_specs, operation=Operations.remove, action=lambda m, s: m.remove(s.package_id), verb="remove", past="removed", prep="from", label="Removing", done_label="Removed", ) @mpm.command(short_help="Sync local package info.", section=MAINTENANCE) @pass_context def sync(ctx): """Sync local package metadata and info from external sources.""" managers = list(ctx.obj.selected_managers(implements_operation=Operations.sync)) announce = logging.INFO if ctx.obj.user_selection else logging.DEBUG # Sync is independent per manager, so fan out concurrently with a βœ“/βœ— trail and # a success-count finisher (see collect_from_managers). collect_from_managers( "Syncing", "Synced", managers, _maintenance_work(announce, "Sync {} package info...", lambda m: m.sync()), report_state=True, ) @mpm.command(short_help="Cleanup local data.", section=MAINTENANCE) @pass_context def cleanup(ctx): """Cleanup local data, temporary artifacts and removes orphaned dependencies.""" managers = list(ctx.obj.selected_managers(implements_operation=Operations.cleanup)) announce = logging.INFO if ctx.obj.user_selection else logging.DEBUG # Cleanup is independent per manager, so fan out concurrently with a βœ“/βœ— trail # and a success-count finisher (see collect_from_managers). collect_from_managers( "Cleaning up", "Cleaned", managers, _maintenance_work(announce, "Cleanup {}...", lambda m: m.cleanup()), report_state=True, ) @mpm.command( aliases=["backup", "lock", "freeze", "snapshot"], short_help="Snapshot installed packages to a TOML manifest or a Brewfile.", section=SNAPSHOTS, ) @option( "--toml", "output_format", flag_value="toml", default=True, help="Emit a TOML manifest with one section per manager. Default.", ) @option( "--brewfile", "output_format", flag_value="brewfile", help=( "Emit a Brewfile that `brew bundle install` can consume. Only managers " "natively supported by brew bundle are included (brew, cask, mas, vscode, " "npm, cargo, uv, winget, flatpak). Other managers are tallied in the " "header and excluded from the output." ), ) @option( "--overwrite", "--force", "--replace", is_flag=True, default=False, help="Allow the output file to be silently wiped out if it already exists.", ) @option( "--header/--no-header", "include_header", default=True, help="Include a metadata + warning comment block at the top of the output.", ) @option( "--merge", is_flag=True, default=False, help="TOML only. Read the provided file and add each new entry to it. " "Requires the [OUTPUT_PATH] argument.", ) @option( "--update-version", is_flag=True, default=False, help="TOML only. Read the provided file and update each existing entry with " "the version currently installed on the system. Requires the [OUTPUT_PATH] " "argument.", ) @option( "--query", type=STRING, default=None, metavar="QUERY", help="Only snapshot installed packages whose ID or name matches QUERY. Fuzzy " "by default (case-insensitive, tokenized); see --exact.", ) @option( "--exact/--fuzzy", default=False, help="With --query, require a verbatim match on the package ID or name instead " "of the default fuzzy match. No effect without --query.", ) @argument( "output_path", type=file_path(writable=True, resolve_path=True, allow_dash=True), default="-", ) @pass_context def dump( ctx, output_format, overwrite, include_header, merge, update_version, query, exact, output_path, ): """Dump installed packages to a TOML manifest or a Brewfile. By default emits TOML, one section per manager (one entry per package, keyed by package ID, with the installed version as the value). Pass ``--brewfile`` to emit a Brewfile compatible with ``brew bundle install``. With no [OUTPUT_PATH] argument, writes to stdout. TOML files are readable by ``mpm restore``. With ``--query``, restrict the snapshot to installed packages whose ID or name matches it (fuzzy by default, verbatim with ``--exact``). ``--merge`` and ``--update-version`` operate on an existing TOML file; both require the [OUTPUT_PATH] argument and neither is valid with ``--brewfile``. """ # --merge / --update-version are TOML-only. if output_format == "brewfile" and (merge or update_version): logging.critical( "--merge / --update-version cannot be combined with --brewfile.", ) ctx.exit(2) if merge and update_version: logging.critical("--merge and --update-version are mutually exclusive.") ctx.exit(2) if output_format == "brewfile": if is_stdout(output_path): if overwrite: logging.info("Ignore the --overwrite/--force/--replace option.") logging.info(f"Print Brewfile to {sys.stdout.name}") else: logging.info(f"Dump installed packages as a Brewfile into {output_path}") guard_existing_output(ctx, output_path, overwrite=overwrite) _dump_brewfile( ctx, output_path, include_header=include_header, query=query, exact=exact ) return # TOML path: preserve the existing `mpm backup` flag-validation flow so that # scripts piping the log lines through INFO-level filtering keep working. if is_stdout(output_path): if merge: logging.critical( "--merge requires the [OUTPUT_PATH] argument to point to a file.", ) ctx.exit(2) if update_version: logging.critical( "--update-version requires the [OUTPUT_PATH] argument to point " "to a file.", ) ctx.exit(2) if overwrite: logging.info("Ignore the --overwrite/--force/--replace option.") logging.info(f"Print installed package list to {sys.stdout.name}") else: if merge: logging.info(f"Merge all installed packages into {output_path}") elif update_version: logging.info( f"Update in-place all versions of installed packages " f"found in {output_path}", ) else: logging.info(f"Dump all installed packages into {output_path}") if output_path.exists(): if overwrite: logging.warning("Target file exist and will be overwritten.") else: if merge or update_version: logging.info("Ignore the --overwrite/--force/--replace option.") else: logging.critical("Target file exist and will be overwritten.") ctx.exit(2) elif merge: logging.critical("--merge requires an existing file.") ctx.exit(2) elif update_version: logging.critical("--update-version requires an existing file.") ctx.exit(2) if output_path.suffix.lower() != ".toml": logging.critical("Target file is not a TOML file.") ctx.exit(2) _dump_toml( ctx, output_path, include_header=include_header, merge=merge, update_version=update_version, query=query, exact=exact, ) def _dump_toml( ctx, output_path, *, include_header: bool, merge: bool = False, update_version: bool = False, query: str | None = None, exact: bool = False, ) -> None: """Render the installed inventory as a TOML manifest. Supports the same three modes the historical ``mpm backup`` exposed: a one-shot dump, ``--merge`` (add new entries to an existing file), and ``--update-version`` (refresh the version of entries already in the file). Callers are expected to have validated flag combinations and output-path constraints upstream. """ installed_data: dict[str, dict[str, str]] = {} fields = ("id", "installed_version") if merge or update_version: installed_data = tomllib.loads(output_path.read_text(encoding="utf-8")) content = "" if include_header: content = ( f"# Generated by mpm v{__version__}.\n" f"# Timestamp: {datetime.now(tz=timezone.utc).isoformat()}.\n\n" ) managers = list( ctx.obj.selected_managers(implements_operation=Operations.installed) ) def fetch(manager: PackageManager) -> tuple[str, dict]: logging.info( f"Dumping packages from {theme().invoked_command(manager.id)}...", ) packages = tuple( packages_asdict( _filter_matches(manager.installed_or_empty(), query, exact=exact), fields, ) ) return manager.id, { "packages": packages, "errors": list({e.error for e in manager.cli_errors}), } # Query each manager's installed packages concurrently, then assemble the # manifest in manager order (see collect_from_managers). for manager_id, data in collect_from_managers("Dumping", "Dumped", managers, fetch): for pkg in data["packages"]: if update_version: if pkg["id"] in installed_data.get(manager_id, {}): installed_data[manager_id][pkg["id"]] = str( pkg["installed_version"], ) else: installed_data.setdefault(manager_id, {})[pkg["id"]] = str( pkg["installed_version"], ) if installed_data.get(manager_id): installed_data[manager_id] = dict( sorted( installed_data[manager_id].items(), key=lambda i: (i[0].lower(), i[0]), ), ) content += "\n".join( tomli_w.dumps({manager_id: packages}) for manager_id, packages in installed_data.items() ) echo(content, file=prep_path(output_path)) if ctx.obj.summary: print_summary(Counter({k: len(v) for k, v in installed_data.items()})) def _dump_brewfile( ctx, output_path, *, include_header: bool, query: str | None = None, exact: bool = False, ) -> None: """Render the installed inventory as a Brewfile. Filters selected managers down to those with a configured :py:attr:`PackageManager.brewfile_entry_type`. Counts packages from skipped managers so the header can show what was dropped, and emits a stderr warning for any skipped manager that defines :py:attr:`brewfile_skip_warning` (used by ``vscodium`` to flag the silent-misinstall risk). """ managers = list( ctx.obj.selected_managers(implements_operation=Operations.installed) ) def fetch(manager: PackageManager) -> tuple[str, dict]: packages = tuple( _filter_matches(manager.installed_or_empty(), query, exact=exact) ) return manager.id, { "packages": packages, "errors": list({e.error for e in manager.cli_errors}), } # Query each manager's installed packages concurrently, then build the Brewfile # from the gathered data (see collect_from_managers). results = collect_from_managers("Reading", "Read", managers, fetch) packages_by_manager = {mid: data["packages"] for mid, data in results} errored = {mid for mid, data in results if data["errors"]} mappable_managers = [] skipped_counts: Counter[str] = Counter() for manager in managers: if manager.brewfile_entry_type is None: # Drop managers whose CLI failed (installed_or_empty already warned), # so they stay out of the header tally as before. if manager.id in errored: continue installed_count = len(packages_by_manager.get(manager.id, ())) skipped_counts[manager.id] = installed_count if installed_count and manager.brewfile_skip_warning: logging.warning( manager.brewfile_skip_warning.format(count=installed_count), ) continue mappable_managers.append(manager) content = build_brewfile( mappable_managers, packages_by_manager=packages_by_manager, include_header=include_header, skipped_counts=skipped_counts, platform=current_platform().name, ) echo(content, file=prep_path(output_path), nl=False) if ctx.obj.summary: section_counts: Counter[str] = Counter() for line in content.splitlines(): if not line or line.startswith("#"): continue entry_type = line.split(" ", 1)[0] section_counts[entry_type] += 1 print_summary(section_counts) @mpm.command( short_help="Install packages referenced in TOML files.", section=SNAPSHOTS, ) @argument("toml_files", type=File("r"), required=True, nargs=-1) @pass_context def restore(ctx, toml_files): """Read TOML files then install or upgrade each package referenced in them.""" # The sections are independent (each ties its packages to one manager), so restore # fans out across managers (see collect_per_package). The one ordering need is a # not-yet-built feature: install cask's mas binary before the [mas] section runs # (use-case: dotfiles). When it lands it must become parallel-within-dependency- # levels rather than this flat fan-out. # Cast generator to tuple because of reuse across TOML files and the trail. selected_managers = tuple( ctx.obj.selected_managers(implements_operation=Operations.install), ) # Collect every package a manager failed to install, to raise a non-zero exit code # at the end (matching install, remove and upgrade). restore_failures: list[str] = [] failures_lock = threading.Lock() # Gather one task per referenced (package, manager) across all the input files. tasks: list[tuple[PackageManager, Callable[[], tuple[bool, str]]]] = [] for toml_input in toml_files: is_stdin = isinstance(toml_input, TextIOWrapper) if is_stdin: toml_input.reconfigure(encoding="utf-8") toml_filepath = toml_input.name toml_content = toml_input.read() else: toml_filepath = Path(toml_input.name).resolve() toml_content = toml_filepath.read_text(encoding="utf-8") logging.info(f"Load package list from {toml_filepath}") doc = tomllib.loads(toml_content) # List unrecognized sections. ignored_sections = [ f"[{section}]" for section in doc if section not in pool.all_manager_ids ] if ignored_sections: plural = "s" if len(ignored_sections) > 1 else "" sections = ", ".join(ignored_sections) logging.info(f"Ignore {sections} section{plural}.") for manager in selected_managers: if manager.id not in doc: logging.info( f"No [{theme().invoked_command(manager.id)}] section found.", ) continue logging.info(f"Restore {theme().invoked_command(manager.id)} packages...") for package_id, version in doc[manager.id].items(): spec = Specifier( raw_spec=f"pkg:{manager.id}/{package_id}{VERSION_SEP}{version}", package_id=package_id, manager_id=manager.id, version=str(version), ) tasks.append(( manager, _package_task( manager, spec, failures_lock, action=lambda m, s: m.install(s.package_id, version=s.version), verb="install", past="installed", prep="with", record_failure=lambda s: restore_failures.append(s.package_id), ), )) collect_per_package("Restoring", "Restored", tasks) # Fail with a non-zero exit code if any referenced package could not be installed. if restore_failures: logging.critical( "Could not restore: " + ", ".join(sorted(set(restore_failures))) + ".", ) ctx.exit(1) @mpm.command( short_help="Export installed packages to a SBOM document.", section=SBOM_SECTION, ) @option( "--spdx/--cyclonedx", default=True, help="SBOM standard to export to.", ) @option( "--format", "export_format", type=EnumChoice(ExportFormat), help=f"File format of the export. Defaults to JSON for {sys.stdout.name}. If not " "provided, will be autodetected from file extension.", ) @option( "--overwrite", "--force", "--replace", is_flag=True, default=False, help="Allow the target file to be silently wiped out if it already exists.", ) @option( "--bundled/--minimal", default=True, help=( "Bundled mode (the default) queries each manager for richer " "metadata (license, supplier, homepage, checksums, declared " "dependencies) and merges per-package upstream SBOM documents into " "the aggregate when the manager publishes them (like Homebrew's " "HOMEBREW_SBOM=1 per-formula files). Minimal mode lists installed " "packages with the bare inventory data (name, version, purl) and " "skips the metadata extractors entirely. Bundled mode is slower " "because it may shell out or read on-disk SBOM files per package; " "pick --minimal for fast inventory snapshots." ), ) @option( "--query", type=STRING, default=None, metavar="QUERY", help="Only export installed packages whose ID or name matches QUERY. Fuzzy " "by default (case-insensitive, tokenized); see --exact.", ) @option( "--exact/--fuzzy", default=False, help="With --query, require a verbatim match on the package ID or name instead " "of the default fuzzy match. No effect without --query.", ) @argument( "export_path", type=file_path(writable=True, resolve_path=True, allow_dash=True), default="-", ) @pass_context def sbom(ctx, spdx, export_format, overwrite, bundled, query, exact, export_path): """Export list of installed packages to a SPDX or CycloneDX file. With ``--query``, restrict the export to installed packages whose ID or name matches it (fuzzy by default, verbatim with ``--exact``). """ standard = "SPDX" if spdx else "CycloneDX" if is_stdout(export_path): if overwrite: logging.info("Ignore the --overwrite/--force/--replace option.") logging.info(f"Print {standard} export to {sys.stdout.name}") else: logging.info(f"Export installed packages in {standard} to {export_path}") guard_existing_output(ctx, export_path, overwrite=overwrite) # <stdout> format defaults to JSON. if is_stdout(export_path): if not export_format: export_format = ExportFormat.JSON # If no export format has been provided, guess it from file name. else: guessed_format = SBOM.autodetect_export_format(export_path) if not export_format: if not guessed_format: # On Python 3.10, ``ExportFormat`` extends ``backports.strenum.StrEnum`` # whose typeshed stub omits ``__iter__``; iteration is provided by the # ``EnumMeta`` metaclass at runtime. supported = ", ".join( f.value for f in ExportFormat # type: ignore[attr-defined] ) logging.critical( f"Cannot guess export format from {export_path.name!r}. " f"Use --format to pick one of: {supported}." ) ctx.exit(2) export_format = guessed_format elif guessed_format and export_format != guessed_format: logging.critical(f"Selected {export_format} does not match file extension.") ctx.exit(2) sbom_class: type[SBOM] if spdx: if not spdx_support: raise UsageError( "SPDX SBOM generation requires the [sbom] extra. " "Install with: pip install meta-package-manager[sbom]", ) sbom_class = SPDX else: if not cyclonedx_support: raise UsageError( "CycloneDX SBOM generation requires the [sbom] extra. " "Install with: pip install meta-package-manager[sbom]", ) if export_format not in (ExportFormat.JSON, ExportFormat.XML): logging.critical(f"{standard} does not support {export_format} format.") ctx.exit(2) sbom_class = CycloneDX sbom = sbom_class(export_format) sbom.init_doc() managers = list( ctx.obj.selected_managers(implements_operation=Operations.installed) ) by_id = {manager.id: manager for manager in managers} def fetch(manager: PackageManager) -> tuple[str, dict]: logging.info(f"Export packages from {theme().invoked_command(manager.id)}...") installed_packages = tuple( _filter_matches(manager.installed_or_empty(), query, exact=exact) ) # In --bundled mode, enrich each package with its metadata here too, so the # slow per-manager metadata fetch parallelizes alongside the listing. enriched = None if bundled and installed_packages: try: enriched = list(manager.package_metadata_batch(installed_packages)) except Exception as exc: # noqa: BLE001 logging.info( f"Falling back to minimal SBOM data for " f"{theme().invoked_command(manager.id)}: {exc}" ) return manager.id, { "packages": installed_packages, "enriched": enriched, "errors": list({e.error for e in manager.cli_errors}), } # Query (and, for --bundled, enrich) each manager concurrently, then add the # packages to the document in manager order (see collect_from_managers). for manager_id, data in collect_from_managers( "Exporting", "Exported", managers, fetch ): installed_packages = data["packages"] if not installed_packages: continue manager = by_id[manager_id] if data["enriched"] is not None: for package, metadata in data["enriched"]: sbom.add_package(manager, package, metadata) else: for package in installed_packages: sbom.add_package(manager, package) sbom.finalize() if ctx.obj.summary: print_summary(*sbom_summary(sbom, bundled)) echo(sbom.export(), file=prep_path(export_path))