Merge pull request #20272 from Homebrew/refactor_download_queue
Refactor DownloadQueue handling
This commit is contained in:
commit
042c79e7ec
@ -26,7 +26,6 @@ module Homebrew
|
|||||||
"(Pass `all` to download for all architectures.)"
|
"(Pass `all` to download for all architectures.)"
|
||||||
flag "--bottle-tag=",
|
flag "--bottle-tag=",
|
||||||
description: "Download a bottle for given tag."
|
description: "Download a bottle for given tag."
|
||||||
flag "--concurrency=", description: "Number of concurrent downloads.", hidden: true
|
|
||||||
switch "--HEAD",
|
switch "--HEAD",
|
||||||
description: "Fetch HEAD version instead of stable version."
|
description: "Fetch HEAD version instead of stable version."
|
||||||
switch "-f", "--force",
|
switch "-f", "--force",
|
||||||
@ -150,12 +149,7 @@ module Homebrew
|
|||||||
download_queue.enqueue(resource)
|
download_queue.enqueue(resource)
|
||||||
end
|
end
|
||||||
|
|
||||||
formula.resources.each do |r|
|
formula.enqueue_resources_and_patches(download_queue:)
|
||||||
download_queue.enqueue(r)
|
|
||||||
r.patches.each { |patch| download_queue.enqueue(patch.resource) if patch.external? }
|
|
||||||
end
|
|
||||||
|
|
||||||
formula.patchlist.each { |patch| download_queue.enqueue(patch.resource) if patch.external? }
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
@ -181,18 +175,13 @@ module Homebrew
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
download_queue.start
|
download_queue.fetch
|
||||||
ensure
|
ensure
|
||||||
download_queue.shutdown
|
download_queue.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
sig { returns(Integer) }
|
|
||||||
def concurrency
|
|
||||||
@concurrency ||= T.let(args.concurrency&.to_i || 1, T.nilable(Integer))
|
|
||||||
end
|
|
||||||
|
|
||||||
sig { returns(Integer) }
|
sig { returns(Integer) }
|
||||||
def retries
|
def retries
|
||||||
@retries ||= T.let(args.retry? ? FETCH_MAX_TRIES : 0, T.nilable(Integer))
|
@retries ||= T.let(args.retry? ? FETCH_MAX_TRIES : 0, T.nilable(Integer))
|
||||||
@ -201,7 +190,7 @@ module Homebrew
|
|||||||
sig { returns(DownloadQueue) }
|
sig { returns(DownloadQueue) }
|
||||||
def download_queue
|
def download_queue
|
||||||
@download_queue ||= T.let(begin
|
@download_queue ||= T.let(begin
|
||||||
DownloadQueue.new(concurrency:, retries:, force: args.force?)
|
DownloadQueue.new(retries:, force: args.force?)
|
||||||
end, T.nilable(DownloadQueue))
|
end, T.nilable(DownloadQueue))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -8,16 +8,16 @@ require "retryable_download"
|
|||||||
|
|
||||||
module Homebrew
|
module Homebrew
|
||||||
class DownloadQueue
|
class DownloadQueue
|
||||||
sig { params(concurrency: Integer, retries: Integer, force: T::Boolean).void }
|
sig { params(retries: Integer, force: T::Boolean).void }
|
||||||
def initialize(concurrency:, retries:, force:)
|
def initialize(retries: 0, force: false)
|
||||||
@concurrency = concurrency
|
@concurrency = T.let(EnvConfig.download_concurrency, Integer)
|
||||||
@quiet = T.let(concurrency > 1, T::Boolean)
|
@quiet = T.let(@concurrency > 1, T::Boolean)
|
||||||
@tries = T.let(retries + 1, Integer)
|
@tries = T.let(retries + 1, Integer)
|
||||||
@force = force
|
@force = force
|
||||||
@pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool)
|
@pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool)
|
||||||
end
|
end
|
||||||
|
|
||||||
sig { params(downloadable: T.any(Resource, Bottle, Cask::Download)).void }
|
sig { params(downloadable: Downloadable).void }
|
||||||
def enqueue(downloadable)
|
def enqueue(downloadable)
|
||||||
downloads[downloadable] ||= Concurrent::Promises.future_on(
|
downloads[downloadable] ||= Concurrent::Promises.future_on(
|
||||||
pool, RetryableDownload.new(downloadable, tries:), force, quiet
|
pool, RetryableDownload.new(downloadable, tries:), force, quiet
|
||||||
@ -28,7 +28,7 @@ module Homebrew
|
|||||||
end
|
end
|
||||||
|
|
||||||
sig { void }
|
sig { void }
|
||||||
def start
|
def fetch
|
||||||
if concurrency == 1
|
if concurrency == 1
|
||||||
downloads.each do |downloadable, promise|
|
downloads.each do |downloadable, promise|
|
||||||
promise.wait!
|
promise.wait!
|
||||||
@ -133,6 +133,8 @@ module Homebrew
|
|||||||
$stdout.flush
|
$stdout.flush
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
downloads.clear
|
||||||
end
|
end
|
||||||
|
|
||||||
sig { void }
|
sig { void }
|
||||||
@ -166,10 +168,9 @@ module Homebrew
|
|||||||
sig { returns(T::Boolean) }
|
sig { returns(T::Boolean) }
|
||||||
attr_reader :quiet
|
attr_reader :quiet
|
||||||
|
|
||||||
sig { returns(T::Hash[T.any(Resource, Bottle, Cask::Download), Concurrent::Promises::Future]) }
|
sig { returns(T::Hash[Downloadable, Concurrent::Promises::Future]) }
|
||||||
def downloads
|
def downloads
|
||||||
@downloads ||= T.let({}, T.nilable(T::Hash[T.any(Resource, Bottle, Cask::Download),
|
@downloads ||= T.let({}, T.nilable(T::Hash[Downloadable, Concurrent::Promises::Future]))
|
||||||
Concurrent::Promises::Future]))
|
|
||||||
end
|
end
|
||||||
|
|
||||||
class Spinner
|
class Spinner
|
||||||
|
@ -630,5 +630,13 @@ module Homebrew
|
|||||||
def devcmdrun?
|
def devcmdrun?
|
||||||
Homebrew::Settings.read("devcmdrun") == "true"
|
Homebrew::Settings.read("devcmdrun") == "true"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
sig { returns(Integer) }
|
||||||
|
def download_concurrency
|
||||||
|
# TODO: document this variable when ready to publicly announce it.
|
||||||
|
concurrency = ENV.fetch("HOMEBREW_DOWNLOAD_CONCURRENCY", 1).to_i
|
||||||
|
concurrency = 1 if concurrency <= 1
|
||||||
|
concurrency
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -3202,6 +3202,15 @@ class Formula
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
sig { params(download_queue: Homebrew::DownloadQueue).void }
|
||||||
|
def enqueue_resources_and_patches(download_queue:)
|
||||||
|
resources.each do |resource|
|
||||||
|
download_queue.enqueue(resource)
|
||||||
|
resource.patches.select(&:external?).each { |patch| download_queue.enqueue(patch.resource) }
|
||||||
|
end
|
||||||
|
patchlist.select(&:external?).each { |patch| download_queue.enqueue(patch.resource) }
|
||||||
|
end
|
||||||
|
|
||||||
sig { void }
|
sig { void }
|
||||||
def fetch_patches
|
def fetch_patches
|
||||||
patchlist.select(&:external?).each(&:fetch)
|
patchlist.select(&:external?).each(&:fetch)
|
||||||
|
@ -5,10 +5,6 @@ module Homebrew
|
|||||||
class RetryableDownload
|
class RetryableDownload
|
||||||
include Downloadable
|
include Downloadable
|
||||||
|
|
||||||
sig { returns(Downloadable) }
|
|
||||||
attr_reader :downloadable
|
|
||||||
private :downloadable
|
|
||||||
|
|
||||||
sig { override.returns(T.any(NilClass, String, URL)) }
|
sig { override.returns(T.any(NilClass, String, URL)) }
|
||||||
def url = downloadable.url
|
def url = downloadable.url
|
||||||
|
|
||||||
@ -92,5 +88,10 @@ module Homebrew
|
|||||||
|
|
||||||
sig { override.returns(String) }
|
sig { override.returns(String) }
|
||||||
def download_name = downloadable.download_name
|
def download_name = downloadable.download_name
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
sig { returns(Downloadable) }
|
||||||
|
attr_reader :downloadable
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -19,7 +19,7 @@ RSpec.describe Homebrew::Cmd::FetchCmd do
|
|||||||
setup_test_formula "testball1"
|
setup_test_formula "testball1"
|
||||||
setup_test_formula "testball2"
|
setup_test_formula "testball2"
|
||||||
|
|
||||||
expect { brew "fetch", "testball1", "testball2", "--concurrency=2" }.to be_a_success
|
expect { brew "fetch", "testball1", "testball2", "HOMEBREW_DOWNLOAD_CONCURRENCY" => "2" }.to be_a_success
|
||||||
|
|
||||||
expect(HOMEBREW_CACHE/"testball1--0.1.tbz").to be_a_symlink
|
expect(HOMEBREW_CACHE/"testball1--0.1.tbz").to be_a_symlink
|
||||||
expect(HOMEBREW_CACHE/"testball1--0.1.tbz").to exist
|
expect(HOMEBREW_CACHE/"testball1--0.1.tbz").to exist
|
||||||
@ -33,7 +33,7 @@ RSpec.describe Homebrew::Cmd::FetchCmd do
|
|||||||
setup_test_formula "testball1"
|
setup_test_formula "testball1"
|
||||||
setup_test_formula "testball3"
|
setup_test_formula "testball3"
|
||||||
|
|
||||||
expect { brew "fetch", "testball1", "testball3", "--concurrency=2" }.to be_a_failure
|
expect { brew "fetch", "testball1", "testball3", "HOMEBREW_DOWNLOAD_CONCURRENCY" => "2" }.to be_a_failure
|
||||||
.and output(/Error:.*process has already locked/).to_stderr
|
.and output(/Error:.*process has already locked/).to_stderr
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
x
Reference in New Issue
Block a user