源码
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
"""
@File : downloadfile.py
@Time : 2022-08-16 10:55
@Author : 坐公交也用券
@Version : 1.0
@Contact : faith01238@hotmail.com
@Homepage : https://liumou.site
@Desc : 当前文件作用
"""
from contextlib import closing
from os import getcwd, path, getenv, remove
import requests
class ProgressBar(object):
def __init__(self, title,
count=0.0,
run_status=None,
fin_status=None,
total=100.0,
unit='', sep='-->>>>',
chunk_size=1.0):
"""
:param title: 文件名称/消息主题
:param count:
:param run_status: 当前运行状态(下载中/下载完成)
:param fin_status:
:param total: 总大小
:param unit: 存储单位。默认:kb
:param sep: 分割线,默认: /
:param chunk_size:
"""
super(ProgressBar, self).__init__()
self.title = title
self.total = total
self.count = count
self.chunk_size = chunk_size
self.status = run_status or ""
self.fin_status = fin_status or " " * len(self.status)
# 大小单位,默认: KB
self.unit = unit
# 分割线,默认: /
self.seq = sep
# 当前进度
self.speed = float(1.0)
# 百分比
self.percentage = float(1.0)
# 总进度
self.Overall_progress = float(1.0)
def __get_info(self):
# 名称 - 状态 状态进度 单位 分割线
self.speed = self.count / self.chunk_size
_info = "[ %s ] %s %s %s %s" % (self.title, self.status, self.speed,
self.unit, self.seq)
self.percentage = self.total / self.chunk_size
self.Overall_progress = float(self.speed) / float(self.percentage)
self.Overall_progress = float(self.Overall_progress) * 100
self.Overall_progress = str(self.Overall_progress)[0:3].replace('.', '')
# 总数 单位 总进度
# info2_ = str(f" {self.percentage} {self.unit} 当前进度: {self.Overall_progress}%")
info2_ = str(" %s %s 当前进度: %s ") % (self.percentage, self.unit, self.Overall_progress) + str("%")
_info = _info + info2_
return _info
def refresh(self, count=1, status=None):
self.count += count
# if status is not None:
self.status = status or self.status
end_str = "\r"
if self.count >= self.total:
end_str = '\n'
self.status = status or self.fin_status
print(self.__get_info(), end=end_str)
class DownloadFile:
def __init__(self, url, dest=getcwd(), file_name=None, skip=True):
"""
下载文件
:param url: 下载链接
:param dest: 保存目录,默认当前目录
:param file_name: 保存文件名,默认从URL获取
:param skip: 已存在且文件大小一致的情况下是否跳过本次下载,默认: 是
"""
self.skip = skip
self.dest = dest
if file_name is None:
file_name = str(url).split('/')[-1]
self.file_name = path.join(self.dest, file_name)
self.url = url
self.content_size = 0
self.file_size = 0
self.files = False
print(self.url, " ---> ", self.file_name)
def exists(self):
"""
检查文件是否存在
:return:
"""
if path.isfile(self.file_name):
print("文件存在: ", self.file_name)
self.files = True
self.file_size = path.getsize(self.file_name)
def download(self):
"""
开始下载
:return:
"""
with closing(requests.get(self.url, stream=True)) as response:
chunk_size = 1024 # 单次请求最大值
self.content_size = int(response.headers['content-length']) # 内容体总大小
if self.content_size == self.file_size and self.skip:
print("文件已下载完整,跳过下载")
else:
down = True
try:
if self.files:
remove(self.file_name)
except Exception as e:
down = False
print(e)
if down:
progress = ProgressBar(self.file_name,
total=self.content_size,
unit="KB",
chunk_size=chunk_size,
run_status="正在下载",
fin_status="下载完成")
with open(self.file_name, "wb") as file:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
progress.refresh(count=len(data))
def start(self):
self.exists()
self.download()
if __name__ == '__main__':
down = DownloadFile(url='https://dldir1.qq.com/qqfile/qq/TIM3.3.9/TIM3.3.9.22051.exe', dest=getenv(key='HOME'))
down.start()
效果