Implement concurrent downloads.

This commit is contained in:
Markus Reiter 2024-07-14 11:42:22 -04:00
parent 4c29d4c74b
commit b297be77a1
No known key found for this signature in database
GPG Key ID: 245293B51702655B
12 changed files with 264 additions and 108 deletions

View File

@ -84,4 +84,4 @@ gem "plist"
gem "ruby-macho"
gem "sorbet-runtime"
gem "warning"
gem 'whirly'
gem "whirly"

View File

@ -8,7 +8,7 @@ require "cask/quarantine"
module Cask
# A download corresponding to a {Cask}.
class Download < ::Downloadable
class Download < Downloadable
include Context
attr_reader :cask
@ -20,6 +20,11 @@ module Cask
@quarantine = quarantine
end
sig { override.returns(String) }
def name
cask.token
end
sig { override.returns(T.nilable(::URL)) }
def url
return if cask.url.nil?
@ -88,6 +93,11 @@ module Cask
cask.token
end
sig { override.returns(String) }
def download_type
"cask"
end
private
def quarantine(path)

View File

@ -5,6 +5,8 @@ require "abstract_command"
require "formula"
require "fetch"
require "cask/download"
require "retryable_download"
require "whirly"
module Homebrew
module Cmd
@ -25,6 +27,7 @@ module Homebrew
"(Pass `all` to download for all architectures.)"
flag "--bottle-tag=",
description: "Download a bottle for given tag."
flag "--concurrency=", description: "Number of concurrent downloads.", hidden: true
switch "--HEAD",
description: "Fetch HEAD version instead of stable version."
switch "-f", "--force",
@ -67,6 +70,17 @@ module Homebrew
named_args [:formula, :cask], min: 1
end
def concurrency
@concurrency ||= args.concurrency&.to_i || 1
end
def download_queue
@download_queue ||= begin
require "download_queue"
DownloadQueue.new(concurrency)
end
end
sig { override.void }
def run
Formulary.enable_factory_cache!
@ -125,13 +139,10 @@ module Homebrew
next
end
begin
bottle.fetch_tab
rescue DownloadError
retry if retry_fetch?(bottle)
raise
if (manifest_resource = bottle.github_packages_manifest_resource)
fetch_downloadable(manifest_resource)
end
fetch_formula(bottle)
fetch_downloadable(bottle)
rescue Interrupt
raise
rescue => e
@ -147,14 +158,14 @@ module Homebrew
next if fetched_bottle
fetch_formula(formula)
fetch_downloadable(formula.resource)
formula.resources.each do |r|
fetch_resource(r)
r.patches.each { |p| fetch_patch(p) if p.external? }
fetch_downloadable(r)
r.patches.each { |patch| fetch_downloadable(patch.resource) if patch.external? }
end
formula.patchlist.each { |p| fetch_patch(p) if p.external? }
formula.patchlist.each { |patch| fetch_downloadable(patch.resource) if patch.external? }
end
end
else
@ -176,81 +187,43 @@ module Homebrew
quarantine = true if quarantine.nil?
download = Cask::Download.new(cask, quarantine:)
fetch_cask(download)
fetch_downloadable(download)
end
end
end
end
downloads.each do |downloadable, promise|
message = "#{downloadable.download_type.capitalize} #{downloadable.name}"
if concurrency > 1
Whirly.start spinner: "arc", status: message
else
puts message
end
promise.wait!
Whirly.configure stop: "✔︎"
Whirly.stop if args.concurrency
rescue ChecksumMismatchError => e
Whirly.configure stop: ""
Whirly.stop if args.concurrency
opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}"
Homebrew.failed = true if downloadable.is_a?(Resource::Patch)
end
download_queue.shutdown
end
private
def fetch_resource(resource)
puts "Resource: #{resource.name}"
fetch_fetchable resource
rescue ChecksumMismatchError => e
retry if retry_fetch?(resource)
opoo "Resource #{resource.name} reports different sha256: #{e.expected}"
def downloads
@downloads ||= {}
end
def fetch_formula(formula)
fetch_fetchable(formula)
rescue ChecksumMismatchError => e
retry if retry_fetch?(formula)
opoo "Formula reports different sha256: #{e.expected}"
end
def fetch_cask(cask_download)
fetch_fetchable(cask_download)
rescue ChecksumMismatchError => e
retry if retry_fetch?(cask_download)
opoo "Cask reports different sha256: #{e.expected}"
end
def fetch_patch(patch)
fetch_fetchable(patch)
rescue ChecksumMismatchError => e
opoo "Patch reports different sha256: #{e.expected}"
Homebrew.failed = true
end
def retry_fetch?(formula)
@fetch_tries ||= Hash.new { |h, k| h[k] = 1 }
if args.retry? && (@fetch_tries[formula] < FETCH_MAX_TRIES)
wait = 2 ** @fetch_tries[formula]
remaining = FETCH_MAX_TRIES - @fetch_tries[formula]
what = Utils.pluralize("tr", remaining, plural: "ies", singular: "y")
ohai "Retrying download in #{wait}s... (#{remaining} #{what} left)"
sleep wait
formula.clear_cache
@fetch_tries[formula] += 1
true
else
Homebrew.failed = true
false
end
end
def fetch_fetchable(formula)
formula.clear_cache if args.force?
already_fetched = formula.cached_download.exist?
begin
download = formula.fetch(verify_download_integrity: false)
rescue DownloadError
retry if retry_fetch?(formula)
raise
end
return unless download.file?
puts "Downloaded to: #{download}" unless already_fetched
puts "SHA256: #{download.sha256}"
formula.verify_download_integrity(download)
def fetch_downloadable(downloadable)
downloads[downloadable] ||= download_queue.enqueue(RetryableDownload.new(downloadable))
end
end
end

View File

@ -0,0 +1,30 @@
# typed: true # rubocop:todo Sorbet/StrictSigil
# frozen_string_literal: true
require "downloadable"
require "concurrent"
module Homebrew
class DownloadQueue
sig { returns(Concurrent::FixedThreadPool) }
attr_reader :pool
private :pool
sig { params(size: Integer).void }
def initialize(size = 1)
@pool = Concurrent::FixedThreadPool.new(size)
end
sig { params(downloadable: Downloadable).returns(Concurrent::Promise) }
def enqueue(downloadable)
Concurrent::Promise.execute(executor: pool) do
downloadable.fetch(quiet: pool.max_length > 1)
end
end
sig { void }
def shutdown
pool.shutdown
end
end
end

View File

@ -40,6 +40,16 @@ class Downloadable
super
end
sig { returns(String) }
def name
""
end
sig { returns(String) }
def download_type
T.must(T.must(self.class.name).split("::").last).gsub(/([[:lower:]])([[:upper:]])/, '\1 \2').downcase
end
sig { returns(T::Boolean) }
def downloaded?
cached_download.exist?
@ -79,11 +89,18 @@ class Downloadable
end
end
sig { params(verify_download_integrity: T::Boolean, timeout: T.nilable(T.any(Integer, Float))).returns(Pathname) }
def fetch(verify_download_integrity: true, timeout: nil)
sig {
params(
verify_download_integrity: T::Boolean,
timeout: T.nilable(T.any(Integer, Float)),
quiet: T::Boolean,
).returns(Pathname)
}
def fetch(verify_download_integrity: true, timeout: nil, quiet: false)
cache.mkpath
begin
downloader.quiet! if quiet
downloader.fetch(timeout:)
rescue ErrorDuringExecution, CurlDownloadStrategyError => e
raise DownloadError.new(self, e)

View File

@ -567,7 +567,7 @@ class Formula
params(name: String, klass: T.class_of(Resource), block: T.nilable(T.proc.bind(Resource).void))
.returns(T.nilable(Resource))
}
def resource(name, klass = Resource, &block) = active_spec.resource(name, klass, &block)
def resource(name = T.unsafe(nil), klass = T.unsafe(nil), &block) = active_spec.resource(*name, *klass, &block)
# Old names for the formula.
#
@ -2765,8 +2765,15 @@ class Formula
self.class.on_system_blocks_exist? || @on_system_blocks_exist
end
def fetch(verify_download_integrity: true)
active_spec.fetch(verify_download_integrity:)
sig {
params(
verify_download_integrity: T::Boolean,
timeout: T.nilable(T.any(Integer, Float)),
quiet: T::Boolean,
).returns(Pathname)
}
def fetch(verify_download_integrity: true, timeout: nil, quiet: false)
active_spec.fetch(verify_download_integrity:, timeout:, quiet:)
end
def verify_download_integrity(filename)

View File

@ -106,7 +106,7 @@ class ExternalPatch
def initialize(strip, &block)
@strip = strip
@resource = Resource::PatchResource.new(&block)
@resource = Resource::Patch.new(&block)
end
sig { returns(T::Boolean) }

View File

@ -140,7 +140,15 @@ class Resource < Downloadable
Partial.new(self, files)
end
def fetch(verify_download_integrity: true)
sig {
override
.params(
verify_download_integrity: T::Boolean,
timeout: T.nilable(T.any(Integer, Float)),
quiet: T::Boolean,
).returns(Pathname)
}
def fetch(verify_download_integrity: true, timeout: nil, quiet: false)
fetch_patches
super
@ -211,7 +219,7 @@ class Resource < Downloadable
end
def patch(strip = :p1, src = nil, &block)
p = Patch.create(strip, src, &block)
p = ::Patch.create(strip, src, &block)
patches << p
end
@ -260,6 +268,19 @@ class Resource < Downloadable
[*extra_urls, *super].uniq
end
# A resource for a formula.
class Formula < Resource
sig { override.returns(String) }
def name
T.must(owner).name
end
sig { override.returns(String) }
def download_name
name
end
end
# A resource containing a Go package.
class Go < Resource
def stage(target, &block)
@ -320,7 +341,7 @@ class Resource < Downloadable
end
# A resource containing a patch.
class PatchResource < Resource
class Patch < Resource
attr_reader :patch_files
def initialize(&block)

View File

@ -0,0 +1,70 @@
# typed: true # rubocop:todo Sorbet/StrictSigil
# frozen_string_literal: true
module Homebrew
class RetryableDownload < Downloadable
sig { returns(Downloadable) }
attr_reader :downloadable
private :downloadable
sig { params(downloadable: Downloadable, tries: Integer).void }
def initialize(downloadable, tries: 3)
super()
@downloadable = downloadable
@try = 0
@tries = tries
end
sig { override.returns(String) }
def name = downloadable.name
sig { override.returns(String) }
def download_type = downloadable.download_type
sig { override.returns(T::Boolean) }
def downloaded? = downloadable.downloaded?
sig { override.returns(Pathname) }
def cached_download = downloadable.cached_download
sig {
override.params(
verify_download_integrity: T::Boolean,
timeout: T.nilable(T.any(Integer, Float)),
quiet: T::Boolean,
).returns(Pathname)
}
def fetch(verify_download_integrity: true, timeout: nil, quiet: false)
@try += 1
already_downloaded = downloadable.downloaded?
download = downloadable.fetch(verify_download_integrity: false, timeout:, quiet:)
return download unless download.file?
unless quiet
puts "Downloaded to: #{download}" unless already_downloaded
puts "SHA256: #{download.sha256}"
end
downloadable.verify_download_integrity(download) if verify_download_integrity
download
rescue DownloadError, ChecksumMismatchError
tries_remaining = @tries - @try
raise if tries_remaining.zero?
wait = 2 ** @try
unless quiet
what = Utils.pluralize("tr", tries_remaining, plural: "ies", singular: "y")
ohai "Retrying download in #{wait}s... (#{tries_remaining} #{what} left)"
end
sleep wait
downloadable.clear_cache
retry
end
end
end

View File

@ -15,7 +15,7 @@ require "compilers"
require "macos_version"
require "extend/on_system"
class SoftwareSpec
class SoftwareSpec < Downloadable
extend Forwardable
include OnSystem::MacOSAndLinux
@ -34,8 +34,10 @@ class SoftwareSpec
def_delegators :@resource, :sha256
def initialize(flags: [])
super()
# Ensure this is synced with `initialize_dup` and `freeze` (excluding simple objects like integers and booleans)
@resource = Resource.new
@resource = Resource::Formula.new
@resources = {}
@dependency_collector = DependencyCollector.new
@bottle_specification = BottleSpecification.new
@ -78,6 +80,11 @@ class SoftwareSpec
super
end
sig { override.returns(String) }
def download_type
"formula"
end
def owner=(owner)
@name = owner.name
@full_name = owner.full_name
@ -126,8 +133,9 @@ class SoftwareSpec
params(name: String, klass: T.class_of(Resource), block: T.nilable(T.proc.bind(Resource).void))
.returns(T.nilable(Resource))
}
def resource(name, klass = Resource, &block)
def resource(name = T.unsafe(nil), klass = Resource, &block)
if block
raise ArgumentError, "Resource must have a name." if name.nil?
raise DuplicateResourceError, name if resource_defined?(name)
res = klass.new(name, &block)
@ -137,6 +145,8 @@ class SoftwareSpec
dependency_collector.add(res)
res
else
return @resource if name.nil?
resources.fetch(name) { raise ResourceMissingError.new(owner, name) }
end
end
@ -284,7 +294,7 @@ class HeadSoftwareSpec < SoftwareSpec
end
end
class Bottle
class Bottle < Downloadable
class Filename
attr_reader :name, :version, :tag, :rebuild
@ -341,6 +351,8 @@ class Bottle
def_delegators :resource, :cached_download
def initialize(formula, spec, tag = nil)
super()
@name = formula.name
@resource = Resource.new
@resource.owner = formula
@ -360,8 +372,15 @@ class Bottle
root_url(spec.root_url, spec.root_url_specs)
end
def fetch(verify_download_integrity: true)
@resource.fetch(verify_download_integrity:)
sig {
override.params(
verify_download_integrity: T::Boolean,
timeout: T.nilable(T.any(Integer, Float)),
quiet: T.nilable(T::Boolean),
).returns(Pathname)
}
def fetch(verify_download_integrity: true, timeout: nil, quiet: false)
resource.fetch(verify_download_integrity:, timeout:, quiet:)
rescue DownloadError
raise unless fallback_on_error
@ -369,6 +388,7 @@ class Bottle
retry
end
sig { override.void }
def clear_cache
@resource.clear_cache
github_packages_manifest_resource&.clear_cache
@ -388,26 +408,30 @@ class Bottle
resource.downloader.stage
end
def fetch_tab
return if github_packages_manifest_resource.blank?
def fetch_tab(timeout: nil, quiet: false)
return unless (resource = github_packages_manifest_resource)
github_packages_manifest_resource.fetch
rescue DownloadError
raise unless fallback_on_error
begin
resource.fetch(timeout:, quiet:)
rescue DownloadError
raise unless fallback_on_error
retry
rescue ArgumentError
raise if @fetch_tab_retried
retry
rescue ArgumentError
raise if @fetch_tab_retried
@fetch_tab_retried = true
github_packages_manifest_resource.clear_cache
retry
@fetch_tab_retried = true
resource.clear_cache
retry
end
end
def tab_attributes
return {} unless github_packages_manifest_resource&.downloaded?
if (resource = github_packages_manifest_resource) && resource.downloaded?
return resource.tab
end
github_packages_manifest_resource.tab
{}
end
sig { returns(Filename) }
@ -415,8 +439,7 @@ class Bottle
Filename.create(resource.owner, @tag, @spec.rebuild)
end
private
sig { returns(T.nilable(Resource::BottleManifest)) }
def github_packages_manifest_resource
return if @resource.download_strategy != CurlGitHubPackagesDownloadStrategy
@ -439,6 +462,8 @@ class Bottle
end
end
private
def select_download_strategy(specs)
specs[:using] ||= DownloadStrategyDetector.detect(@root_url)
specs[:bottle] = true

View File

@ -32,6 +32,9 @@ class Homebrew::Cmd::FetchCmd::Args < Homebrew::CLI::Args
sig { returns(T::Boolean) }
def casks?; end
sig { returns(T.nilable(String)) }
def concurrency; end
sig { returns(T::Boolean) }
def deps?; end

View File

@ -55,7 +55,7 @@ RSpec.describe Patch do
subject(:patch) { described_class.create(:p2, nil) }
context "when the patch is empty" do
it(:resource) { expect(patch.resource).to be_a Resource::PatchResource }
it(:resource) { expect(patch.resource).to be_a Resource::Patch }
it { expect(patch.patch_files).to eq(patch.resource.patch_files) }
it { expect(patch.patch_files).to eq([]) }
end