# Copyright Kevin Deldycke <kevin@deldycke.com> and contributors.
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""PyPI API client for package metadata lookups.
Provides a shared HTTP client and domain-specific query functions used by
{mod}`repomatic.changelog` (release dates, yanked status) and
{mod}`repomatic.renovate` (source repository discovery).
"""
from __future__ import annotations
import json
import logging
from typing import NamedTuple
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from .cache import get_cached_response, store_response
from .config import load_repomatic_config
PYPI_API_URL = "https://pypi.org/pypi/{package}/json"
"""PyPI JSON API URL for fetching all release metadata for a package."""
PYPI_PROJECT_URL = "https://pypi.org/project/{package}/{version}/"
"""PyPI project page URL for a specific version."""
PYPI_PROVENANCE_URL = (
"https://pypi.org/integrity/{package}/{version}/{filename}/provenance"
)
"""PyPI integrity API endpoint exposing PEP 740 attestation bundles for a file.
The response includes a ``publisher`` object per bundle that names the OIDC
identity used to upload (kind, repository, workflow filename, environment).
This is the only public surface where the OIDC ``job_workflow_ref`` claim is
observable: project-level Trusted Publisher settings live behind the owner-only
``/manage/project/<name>/settings/publishing/`` page.
"""
PYPI_TRUSTED_PUBLISHER_SETTINGS_URL = (
"https://pypi.org/manage/project/{package}/settings/publishing/"
)
"""Owner-only page where Trusted Publisher entries are registered."""
PYPI_TRUSTED_PUBLISHER_WORKFLOW = "release.yaml"
"""Workflow filename each downstream registers as the Trusted Publisher.
The caller-side ``publish-pypi`` job is appended to ``release.yaml`` in every
downstream repo (see ``repomatic/data/release-publish-pypi-job.yaml``), and the
composite action it invokes inherits the calling job's OIDC context. The OIDC
``job_workflow_ref`` claim therefore names this file: that is what the PyPI
Trusted Publisher entry must match.
"""
[docs]
def pypi_trusted_publisher_settings_url(
package: str,
*,
owner: str | None = None,
repository: str | None = None,
workflow_filename: str | None = None,
environment: str | None = None,
) -> str:
"""Build the PyPI Trusted Publisher settings page URL for a project.
Without keyword arguments, returns the bare settings URL. When any GitHub
publisher field is provided, appends the query string PyPI's settings page
consumes to activate the GitHub tab and pre-populate the form: see the
``manage_project_oidc_publishers_prefill`` view in
[pypi/warehouse](https://github.com/pypi/warehouse/blob/main/warehouse/manage/views/oidc_publishers.py).
:param package: PyPI project name.
:param owner: GitHub owner (user or org) prefilled in the form.
:param repository: GitHub repository name prefilled in the form.
:param workflow_filename: Workflow filename prefilled in the form (e.g.,
:data:`PYPI_TRUSTED_PUBLISHER_WORKFLOW`).
:param environment: GitHub Actions environment name prefilled in the form.
:return: The settings URL, optionally with a ``?provider=github&β¦`` suffix.
"""
base = PYPI_TRUSTED_PUBLISHER_SETTINGS_URL.format(package=package)
fields = {
"owner": owner,
"repository": repository,
"workflow_filename": workflow_filename,
"environment": environment,
}
prefill = {key: value for key, value in fields.items() if value}
if not prefill:
return base
# `provider=github` selects the GitHub tab and routes the remaining
# parameters to the GitHub publisher form.
query = urlencode({"provider": "github", **prefill})
return f"{base}?{query}"
PYPI_LABEL = "π PyPI"
"""Display label for PyPI releases in admonitions."""
# Keys in PyPI `project_urls` that typically point to a changelog,
# checked in priority order.
_CHANGELOG_URL_KEYS = (
"Changelog",
"Changes",
"Change Log",
"Release Notes",
"History",
)
# Keys in PyPI `project_urls` that typically point to a GitHub repository,
# checked in priority order.
_SOURCE_URL_KEYS = (
"Source",
"Source Code",
"Source code",
"Repository",
"Code",
"Homepage",
)
def _fetch_json(package: str) -> dict | None:
"""Fetch the full JSON metadata for a PyPI package.
Results are cached under the `pypi` namespace. Freshness TTL is read
from `CacheConfig.pypi_ttl`.
:param package: The PyPI package name.
:return: Parsed JSON response, or `None` on any failure.
"""
ttl = load_repomatic_config().cache.pypi_ttl
cached = get_cached_response("pypi", package, ttl)
if cached is not None:
try:
return json.loads(cached) # type: ignore[no-any-return]
except json.JSONDecodeError:
pass
url = PYPI_API_URL.format(package=package)
request = Request(url, headers={"Accept": "application/json"})
try:
with urlopen(request, timeout=10) as response:
raw = response.read()
result: dict[str, object] = json.loads(raw)
except (URLError, TimeoutError, json.JSONDecodeError) as exc:
logging.debug(f"PyPI lookup failed for {package}: {exc}")
return None
if ttl > 0:
store_response("pypi", package, raw)
return result
[docs]
class PyPIRelease(NamedTuple):
"""Release metadata for a single version from PyPI."""
date: str
"""Earliest upload date across all files in `YYYY-MM-DD` format."""
yanked: bool
"""Whether all files for this version are yanked."""
package: str
"""PyPI package name this release was fetched from.
Needed for projects that were renamed: older versions live under a
former package name and their PyPI URLs must point to that name, not
the current one.
"""
[docs]
def get_release_dates(package: str) -> dict[str, PyPIRelease]:
"""Get upload dates and yanked status for all versions from PyPI.
Fetches the package metadata in a single API call. For each version,
selects the **earliest** upload time across all distribution files as
the canonical release date. A version is considered yanked only if
**all** of its files are yanked.
:param package: The PyPI package name.
:return: Dict mapping version strings to {class}`PyPIRelease` tuples.
Empty dict if the package is not found or the request fails.
"""
data = _fetch_json(package)
if data is None:
return {}
result: dict[str, PyPIRelease] = {}
for version, files in data.get("releases", {}).items():
if not files:
continue
# Select the earliest upload time across all distribution files.
dates = [f["upload_time"][:10] for f in files if f.get("upload_time")]
if not dates:
continue
earliest_date = min(dates)
# A version is yanked only if every file is yanked.
all_yanked = all(f.get("yanked", False) for f in files)
result[version] = PyPIRelease(
date=earliest_date, yanked=all_yanked, package=package
)
return result
[docs]
def get_source_url(package: str) -> str | None:
"""Discover the GitHub repository URL for a PyPI package.
Queries the PyPI JSON API and scans `project_urls` for keys that
typically point to a source repository on GitHub.
:param package: The PyPI package name.
:return: The GitHub repository URL, or `None` if not found.
"""
data = _fetch_json(package)
if data is None:
return None
project_urls: dict[str, str] = data.get("info", {}).get("project_urls") or {}
for key in _SOURCE_URL_KEYS:
candidate = project_urls.get(key, "")
if "github.com" in candidate:
return candidate.rstrip("/").removesuffix(".git")
# Fallback: scan all values for a GitHub URL.
for candidate in project_urls.values():
if "github.com" in candidate:
return candidate.rstrip("/").removesuffix(".git")
return None
[docs]
class TrustedPublisher(NamedTuple):
"""OIDC publisher metadata extracted from a PyPI provenance bundle."""
kind: str
"""Publisher kind, e.g., ``"GitHub"`` or ``"GitLab"``."""
repository: str
"""Repository slug (``"owner/name"`` for GitHub publishers)."""
workflow: str
"""Workflow filename within ``.github/workflows/`` (e.g., ``"release.yaml"``)."""
environment: str | None
"""GitHub Actions environment name, when the publisher was scoped to one."""
[docs]
def get_latest_release_file(package: str) -> tuple[str, str] | None:
"""Return ``(version, filename)`` for the latest non-yanked release on PyPI.
Picks the version with the most recent earliest-upload time and returns
a representative distribution file from that version. Wheels are
preferred over sdists since wheels are guaranteed to exist for any
package built with modern tooling.
:param package: The PyPI package name.
:return: Tuple of ``(version, filename)``, or ``None`` if the package
has no published releases or the request fails.
"""
data = _fetch_json(package)
if data is None:
return None
releases: dict[str, list[dict]] = data.get("releases") or {}
candidates: list[tuple[str, str, str]] = []
for version, files in releases.items():
live_files = [f for f in files if not f.get("yanked", False)]
if not live_files:
continue
upload_dates = [f["upload_time"] for f in live_files if f.get("upload_time")]
if not upload_dates:
continue
candidates.append((min(upload_dates), version, ""))
wheels = [f for f in live_files if f.get("filename", "").endswith(".whl")]
chosen = wheels[0] if wheels else live_files[0]
candidates[-1] = (min(upload_dates), version, chosen["filename"])
if not candidates:
return None
candidates.sort()
_, version, filename = candidates[-1]
return version, filename
[docs]
def get_trusted_publishers(
package: str, version: str, filename: str
) -> list[TrustedPublisher] | None:
"""Fetch PEP 740 provenance for a file and extract publisher entries.
Calls :data:`PYPI_PROVENANCE_URL` and parses the ``attestation_bundles``
array. Each bundle's ``publisher`` object names the OIDC identity that
uploaded the file.
:param package: The PyPI package name.
:param version: The release version (e.g., ``"1.2.3"``).
:param filename: The distribution filename (e.g.,
``"my_pkg-1.2.3-py3-none-any.whl"``).
:return: List of :class:`TrustedPublisher` entries (possibly empty when
provenance exists but no bundles are present), or ``None`` when the
endpoint returns 404 or any network/parse error occurs (signal that
no provenance is available rather than that none was registered).
"""
url = PYPI_PROVENANCE_URL.format(
package=package, version=version, filename=filename
)
request = Request(url, headers={"Accept": "application/json"})
try:
with urlopen(request, timeout=10) as response:
raw = response.read()
data: dict[str, object] = json.loads(raw)
except (URLError, TimeoutError, json.JSONDecodeError) as exc:
logging.debug(f"PyPI provenance lookup failed for {package} {version}: {exc}")
return None
raw_bundles = data.get("attestation_bundles")
bundles = raw_bundles if isinstance(raw_bundles, list) else []
publishers: list[TrustedPublisher] = []
for bundle in bundles:
if not isinstance(bundle, dict):
continue
publisher = bundle.get("publisher")
if not isinstance(publisher, dict):
continue
kind = publisher.get("kind")
repository = publisher.get("repository")
workflow = publisher.get("workflow")
if not (
isinstance(kind, str)
and isinstance(repository, str)
and isinstance(workflow, str)
):
continue
environment = publisher.get("environment")
if environment is not None and not isinstance(environment, str):
environment = None
publishers.append(
TrustedPublisher(
kind=kind,
repository=repository,
workflow=workflow,
environment=environment,
)
)
return publishers
[docs]
def get_changelog_url(package: str) -> str | None:
"""Discover the changelog URL for a PyPI package.
Queries the PyPI JSON API and scans `project_urls` for keys that
typically point to a changelog or release notes page.
:param package: The PyPI package name.
:return: The changelog URL, or `None` if not found.
"""
data = _fetch_json(package)
if data is None:
return None
project_urls: dict[str, str] = data.get("info", {}).get("project_urls") or {}
for key in _CHANGELOG_URL_KEYS:
candidate = project_urls.get(key, "")
if candidate:
return candidate.rstrip("/")
return None