Source code for apt_mirror_updater.http

# Automated, robust apt-get mirror selection for Debian and Ubuntu.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: April 15, 2020
# URL: https://apt-mirror-updater.readthedocs.io

"""Simple, robust and concurrent HTTP requests (designed for one very narrow use case)."""

# Standard library modules.
import logging
import multiprocessing
import signal

# External dependencies.
from humanfriendly import Timer, format_size
from six.moves.urllib.request import urlopen
from stopit import SignalTimeout, TimeoutException

# Initialize a logger for this module.
logger = logging.getLogger(__name__)

# Stop the `stopit' logger from logging tracebacks.
logging.getLogger('stopit').setLevel(logging.ERROR)


[docs]def fetch_url(url, timeout=10, retry=False, max_attempts=3): """ Fetch a URL, optionally retrying on failure. :param url: The URL to fetch (a string). :param timeout: The maximum time in seconds that's allowed to pass before the request is aborted (a number, defaults to 10 seconds). :param retry: Whether to retry on failure (defaults to :data:`False`). :param max_attempts: The maximum number of attempts when retrying is enabled (an integer, defaults to three). :returns: The response body (a byte string). :raises: Any of the following exceptions can be raised: - :exc:`NotFoundError` when the URL returns a 404 status code. - :exc:`InvalidResponseError` when the URL returns a status code that isn't 200. - `stopit.TimeoutException`_ when the request takes longer than `timeout` seconds (refer to the linked documentation for details). - Any exception raised by Python's standard library in the last attempt (assuming all attempts raise an exception). .. _stopit.TimeoutException: https://pypi.org/project/stopit/#exception """ timer = Timer() logger.debug("Fetching %s ..", url) for i in range(1, max_attempts + 1): try: with SignalTimeout(timeout, swallow_exc=False): response = urlopen(url) status_code = response.getcode() if status_code != 200: exc_type = (NotFoundError if status_code == 404 else InvalidResponseError) raise exc_type("URL returned unexpected status code %s! (%s)" % (status_code, url)) response_body = response.read() logger.debug("Took %s to fetch %s.", timer, url) return response_body except (NotFoundError, TimeoutException): # We never retry 404 responses and timeouts. raise except Exception as e: if retry and i < max_attempts: logger.warning("Failed to fetch %s, retrying (%i/%i, error was: %s)", url, i, max_attempts, e) else: raise
[docs]def fetch_concurrent(urls, concurrency=None): """ Fetch the given URLs concurrently using :mod:`multiprocessing`. :param urls: An iterable of URLs (strings). :param concurrency: Override the concurrency (an integer, defaults to the value computed by :func:`get_default_concurrency()`). :returns: A list of tuples like those returned by :func:`fetch_worker()`. """ if concurrency is None: concurrency = get_default_concurrency() pool = multiprocessing.Pool(concurrency) try: results = pool.map(fetch_worker, urls, chunksize=1) pool.close() pool.join() return results except Exception: pool.terminate() pool.join() raise
[docs]def get_default_concurrency(): """ Get the default concurrency for :func:`fetch_concurrent()`. :returns: A positive integer number. """ return max(4, multiprocessing.cpu_count() * 2)
[docs]def fetch_worker(url): """ Fetch the given URL for :func:`fetch_concurrent()`. :param url: The URL to fetch (a string). :returns: A tuple of three values: 1. The URL that was fetched (a string). 2. The data that was fetched (a string or :data:`None`). 3. The number of seconds it took to fetch the URL (a number). """ # Ignore Control-C instead of raising KeyboardInterrupt because (due to a # quirk in multiprocessing) this can cause the parent and child processes # to get into a deadlock kind of state where only Control-Z will get you # your precious terminal back; super annoying IMHO. signal.signal(signal.SIGINT, signal.SIG_IGN) timer = Timer() try: data = fetch_url(url, retry=False) except Exception as e: logger.debug("Failed to fetch %s! (%s)", url, e) data = None else: kbps = format_size(round(len(data) / timer.elapsed_time, 2)) logger.debug("Downloaded %s at %s per second.", url, kbps) return url, data, timer.elapsed_time
[docs]class InvalidResponseError(Exception): """Raised by :func:`fetch_url()` when a URL returns a status code that isn't 200."""
[docs]class NotFoundError(InvalidResponseError): """Raised by :func:`fetch_url()` when a URL returns a 404 status code."""