效果
使用帮助
PS D:\code\Coding\xc\tools> python .\AutoDisk.py -h
usage: AutoDisk.py [-h] -d DEV [-f FM] [-p PATH] [-i ID] [-pt PARTITION_TABLE] [-u UNIT] [-start START] [-stop STOP] [-s SIZE] [-c] [--force]
设置分区开始位置,默认: 0
-stop STOP, --stop STOP
设置分区结束位置,默认: 100(也就是百分百)
-s SIZE, --size SIZE 设置分区大小(默认使用起始位置控制大小)
-c, --cover 覆盖路径(当挂载路径已经存在的情况下直接覆盖挂载)
--force 是否强制格式化,慎用
--reset 是否使用初始配置文件
PS D:\code\Coding\xc\tools>
源码
# -*- encoding: utf-8 -*-
"""
@File : AutoDisk.py
@Time : 2023-06-16 15:24
@Author : 坐公交也用券
@Version : 1.0
@Contact : liumou.site@qq.com
@Homepage : https://liumou.site
@Desc : 自动化分区、格式化、挂载
"""
import inspect
import os.path
import platform
import sys
from os import path, environ, getcwd, chdir, getenv
from argparse import ArgumentParser
from datetime import datetime
from subprocess import getoutput, getstatusoutput
class ColorLogger:
def __init__(self, file=None, txt=False, class_name=None, cover=False, fileinfo=False, basename=True):
"""
初始化日志模块
:param file: 设置日志文件
:param txt: 是否启用文本记录功能
:param class_name: 调用的Class名称
:param cover: 当使用文本记录的时候,是否覆盖原内容
:param fileinfo: 是否显示日志文件信息
:param basename: 设置日志文件显示信息,True(只显示文件名), False(显示绝对路径)
"""
self.Red = "\033[31m" # 红色
self.Greet = "\033[32m" # 绿色
self.Yellow = '\033[33m' # 黄色
self.Blue = '\033[34m' # 蓝色
self.RESET_ALL = '\033[0m' # 清空颜色
self.basename = basename
self.fileinfo = fileinfo
self.cover = cover
self.class_name = class_name
# 是否启用txt
self.txt_mode = txt
# 日志文件显示名称
self.file_name = None
# 日志文件绝对路径
self.file_path = file
# 日期
self.date = str(datetime.now()).split('.')[0]
# 行数
self.line_ = 1
# 模块名称
self.module_name = None
# 文件名称
self.filename = None
# 日志内容
self.msg1 = None
# 是否启用文件名
self.format_filename = True
# 是否启用日期
self.format_date = True
# 是否启用时间
self.format_time = True
# 是否显示类名称
self.format_class = True
# 是否显示函数名称
self.format_fun = True
# 是否显示行数
self.format_line = True
# 是否显示 等级
self.format_level = True
# 设置等级关系
self.level_dic = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
# 设置文件记录最低等级
self.level_text = 0
# 设置控制台显示最低等级
self.level_console = 0
# 初始化实例参数
self._init_fun()
def _init_fun(self):
"""
初始化函数
:return:
"""
#
if self.file_path is None and self.txt_mode:
if platform.system().lower() == 'linux'.lower():
file = path.join(environ['HOME'], 'ColorInfo.log')
else:
file = path.join(environ["USERPROFILE"], 'ColorInfo.log')
self.file = str(file)
self.file_path = path.abspath(self.file)
else:
self.file_name = " "
# 如果开启了文本记录模式,则实例化一个读写
if self.txt_mode:
try:
# 日志写入实例
self.txt_wr = open(file=self.file_path, mode='a+', encoding='utf-8')
if self.txt_mode:
self.txt_wr.close()
self.txt_wr = open(file=self.file_path, mode='a+', encoding='utf-8')
if self.cover:
self.txt_wr.close()
self.txt_wr = open(file=self.file_path, mode='w+', encoding='utf-8')
except Exception as e:
print(e)
exit(1)
if self.basename:
self.file_name = path.basename(str(self.file_path))
def set_format(self, date_on=True, time_on=True, filename_on=True, class_on=True, fun=True, line=True, level=True):
"""
设置格式开关,默认全开
:param level: 是否显示等级(默认: True)-DEBUG
:param line: 是否显示行号(默认: True)-line: 230
:param fun: 是否显示函数(默认: True)
:param class_on: 是否显示类(默认: True)
:param date_on: 是否显示日期(默认: True)-2022-11-03
:param time_on: 是否显示时间(默认: True)-20:42:24
:param filename_on: 是否显示文件名(源码文件)(默认: True)-ColorInfo.py
:return:
"""
self.format_date = date_on
self.format_filename = filename_on
self.format_time = time_on
self.format_class = class_on
self.format_fun = fun
self.format_line = line
self.format_level = level
def set_level(self, console="DEBUG", text="DEBUG"):
"""
设置显示等级,当实际等级低于设置等级的时候将不会显示/写入
:param console: 设置控制台显示最低等级(DEBUG/INFO/WARNING/ERROR)
:param text: 设置文本记录最低等级(DEBUG/INFO/WARNING/ERROR)
:return:
"""
level_list = ["DEBUG", "INFO", "WARNING", "ERROR"]
if console.upper() not in level_list:
console = "DEBUG"
if text.upper() not in level_list:
text = "DEBUG"
self.level_console = self.level_dic[console.upper()]
self.level_text = self.level_dic[text.upper()]
def fun_info(self, info):
"""
获取function信息
:param info:
:return:
"""
self.line_ = info[1]
self.module_name = info[2]
filename = info[0]
filename = str(filename).split('/')[-1]
if platform.system().lower() == 'windows'.lower():
filename = path.split(filename)[1]
self.filename = filename
def _create_msg(self, msg, level='DEBUG'):
"""
创建信息
:param msg: 信息
:param level: 信息级别
:return:
"""
msg1 = ''
date_ = self.date.split(' ')[0]
time_ = self.date.split(' ')[1]
if self.format_date:
msg1 = date_ + str(msg1)
if self.format_time:
msg1 = str(msg1) + ' ' + str(time_)
if self.format_filename:
msg1 = msg1 + " " + self.filename
if self.format_line:
msg1 = str(msg1) + " line: " + str(self.line_)
if self.class_name is not None and self.format_class:
msg1 = str(msg1) + " - Class: " + str(self.class_name)
if self.module_name != '<module>' and self.format_fun:
msg1 = str(msg1) + " Function: " + self.module_name
if self.format_level:
msg1 = str(msg1) + " - %s : " % level + msg
self.msg1 = msg1
def _wr(self):
try:
# 如果开启了文本日志
if self.txt_mode:
self.txt_wr.write(self.msg1)
self.txt_wr.write("\n")
except Exception as e:
print(self.Red + str(e) + self.RESET_ALL)
def _arg(self, arg):
"""
解析参数
:param arg:
:return:
"""
arg_ = ''
for i in arg:
arg_ = arg_ + str(i)
return arg_
def _get_time(self):
self.date = str(datetime.now()).split('.')[0]
def info(self, msg, *arg, **kwarg):
"""
打印信息
:param msg: 打印内容
:return:
"""
fun_info = inspect.getframeinfo(inspect.currentframe().f_back)
self.fun_info(info=fun_info)
self._get_time()
if arg:
msg = str(msg) + str(self._arg(arg=arg))
if kwarg:
msg = str(msg) + str(self._arg(arg=kwarg))
self._create_msg(msg=msg, level="INFO")
mess = str(self.Greet + self.msg1 + self.RESET_ALL)
if self.fileinfo and self.txt_mode:
mess = str(self.Greet + str(self.file_name) + ' <<-- ' + self.msg1 + self.RESET_ALL)
if self.level_console <= 1:
print(mess)
if self.level_text <= 1:
self._wr()
def debug(self, msg, *arg, **kwarg):
"""
打印信息
:param msg: 打印内容
:return:
"""
fun_info = inspect.getframeinfo(inspect.currentframe().f_back)
self.fun_info(info=fun_info)
self._get_time()
if arg:
msg = str(msg) + str(self._arg(arg=arg))
if kwarg:
msg = str(msg) + str(self._arg(arg=kwarg))
self._create_msg(msg=msg)
mess = str(self.Blue + self.msg1 + self.RESET_ALL)
if self.fileinfo and self.txt_mode:
mess = str(self.Blue + str(self.file_name) + ' <<-- ' + self.msg1 + self.RESET_ALL)
if self.level_console == 0:
print(mess)
if self.level_text <= 0:
self._wr()
def warning(self, msg, *arg, **kwarg):
"""
打印信息
:param msg: 打印内容
:return:
"""
fun_info = inspect.getframeinfo(inspect.currentframe().f_back)
self.fun_info(info=fun_info)
self._get_time()
if arg:
msg = str(msg) + str(self._arg(arg=arg))
if kwarg:
msg = str(msg) + str(self._arg(arg=kwarg))
self._create_msg(msg=msg, level="WARNING")
mess = str(self.Yellow + self.msg1 + self.RESET_ALL)
if self.fileinfo and self.txt_mode:
mess = str(self.Yellow + str(self.file_name) + ' <<-- ' + self.msg1 + self.RESET_ALL)
if self.level_console <= 2:
print(mess)
if self.level_text <= 2:
self._wr()
def error(self, msg, *arg, **kwarg):
"""
打印信息
:param msg: 打印内容
:return:
"""
fun_info = inspect.getframeinfo(inspect.currentframe().f_back)
self.fun_info(info=fun_info)
self._get_time()
if arg:
msg = str(msg) + str(self._arg(arg=arg))
if kwarg:
msg = str(msg) + str(self._arg(arg=kwarg))
self._create_msg(msg=msg, level="ERROR")
mess = str(self.Red + self.msg1 + self.RESET_ALL)
if self.fileinfo and self.txt_mode:
mess = str(self.Red + str(self.file_name) + ' <<-- ' + self.msg1 + self.RESET_ALL)
if self.level_console <= 3:
print(mess)
if self.level_text <= 3:
self._wr()
class ComMand:
def __init__(self, password, cmd=None, terminal=False, logs=False, work=getcwd()):
"""
执行系统指令
:param terminal: 是否使用图形终端执行, 全局默认,子功能可自定义
:param password: 主机密码(string)
:param cmd: 需要执行的命令(string)
:param logs: 是否使用日志打印信息
:param work: 工作目录(string)
"""
self.pwd = getcwd()
self.logs = logs
chdir(work)
# 是否使用终端执行
self.terminal = terminal
# 设置主机密码
self.password = password
# 传入初始命令
self.cmd = cmd
# 生成最终命令
self.cmd_ = ''
# 执行结果
self.ok = False
# 退出状态码
self.code = 0
# 是否有可用终端
self.use_terminal = False
# 终端类型
self.terminal_type = ''
# 终端参数
self.terminal_arg = ''
# home路径
self.home = getenv('HOME')
# 临时脚本文件
self.sh = path.join(self.home, 'run_tmp.sh')
self.user = getoutput('echo $USER')
self.uid = getoutput('echo $UID')
# 系统类型
self.os_type = platform.system()
if str(self.os_type).lower() == 'linux'.lower():
self.os_type = getoutput("""grep ^ID /etc/os-release | sed 's/ID=//' | sed -n 1p | sed 's#\"##g'""")
# 定义日志文件
log_file = path.join(self.home, 'cmd_.log')
# 初始化日志
self.logger = ColorLogger(file=log_file, txt=logs, class_name=self.__class__.__name__)
if self.terminal:
self._get_terminal()
def create(self):
"""
创建命令脚本
:return:
"""
try:
with open(file=self.sh, mode='w+', encoding='utf-8') as w:
w.write('#!/bin/bash\n')
w.write(self.cmd_)
w.close()
except Exception as e:
print(e)
os.system("chmod +x %s" % self.sh)
os.system("cat %s" % self.sh)
def _get_terminal(self):
"""
获取终端类型
:return:
"""
t_ = {'mate-terminal': '-e', 'gnome-terminal': '-e', 'deepin-terminal': '-C'}
for i in t_:
cmd = "which %s" % str(i)
if int(getstatusoutput(cmd)[0]) == 0:
self.terminal_type = str(i)
self.terminal_arg = t_[i]
self.use_terminal = True
# print('存在终端程序: %s' % str(self.terminal_type))
return True
# print('找不到终端程序: %s' % str(self.terminal_type))
self.terminal = False
return False
def terminal_fun(self):
"""
使用终端执行命令
:return:
"""
if self._get_terminal():
cmd = """%s %s '%s;read -p "请按回车关闭此界面"'""" % (self.terminal_type, self.terminal_arg, self.cmd_)
if self.logs:
self.logger.debug(cmd)
else:
print(cmd)
getoutput(cmd)
else:
print('找不到终端类型,当前系统类型: %s' % self.os_type)
def shell(self, cmd=None, terminal=None):
"""
执行普通Shell命令
:param terminal: 是否使用终端
:param cmd: 需要执行的命令,默认使用实例初始命令
:return: 当使用父进程的时候,返回执行结果(bool)
"""
if cmd is None:
cmd = self.cmd
if terminal is None:
terminal = self.terminal
self.cmd_ = cmd
if self.logs:
self.logger.debug(self.cmd_)
else:
print(self.cmd_)
if terminal and self.use_terminal:
self.terminal_fun()
else:
try:
self.code = os.system(self.cmd_)
except Exception as e:
self.logger.error(str(e))
if int(self.code) == 0:
return True
return False
def sudo(self, cmd=None, terminal=None, name=None):
"""
执行sudo命令
:param cmd: 需要执行的命令,默认使用实例初始命令
:param terminal: 是否使用终端执行(创建新进程,无法获取执行结果),默认使用实例值
:param name: 任务名称
:return: 当 terminal 等于 True则直接返回True, 否则返回命令结果(bool),同时将退出代码赋予self.code
"""
if cmd is None:
cmd = self.cmd
if terminal is None:
terminal = self.terminal
self.cmd_ = str("""echo %s | sudo -S %s""" % (self.password, cmd))
if str(self.user).lower() == 'root' or str(self.uid) == '0':
self.cmd_ = cmd
if terminal and self.use_terminal:
self.terminal_fun()
if name:
print("[ %s ] 已通过终端执行" % str(name))
return True
else:
mess = '执行成功'
ok = True
try:
self.code = os.system(self.cmd_)
except Exception as e:
self.logger.error(str(e))
if int(self.code) != 0:
mess = '执行失败'
ok = False
if name is None:
name = self.cmd_
print("[ %s ] %s" % (str(name), str(mess)))
self.ok = ok
return self.ok
def getout(self, cmd=None):
"""
获取命令输出
:param cmd: 需要执行的命令,默认使用实例初始命令
:return: 返回执行过程数据(str),执行结果通过self.code获取
"""
if cmd is None:
cmd = self.cmd
if self.logs:
self.logger.debug(cmd)
i = getstatusoutput(cmd)
self.code = i[0]
return i[1]
def getout_sudo(self, cmd=None, name=None, debug=None, mess_ok=None, mess_failed=None):
"""
获取sudo权限命令输出
:param mess_failed: 执行失败的信息,默认: 执行失败
:param mess_ok: 执行成功的信息,默认: 执行成功
:param debug: debug
:param name: 任务名称,默认: 完整命令
:param cmd: 需要执行的命令,默认使用实例初始命令
:return: 返回执行过程数据(str),执行结果通过self.code获取
"""
if mess_ok is None:
mess_ok = '执行成功'
if mess_failed is None:
mess_failed = '执行失败'
if cmd is None:
cmd = self.cmd
cmd = str("echo %s | sudo -S %s" % (self.password, cmd))
if name is None:
name = cmd
if debug:
self.logger.debug(cmd)
if debug is None:
debug = self.logs
i = getstatusoutput(cmd)
self.code = i[0]
if int(self.code) == 0:
if debug:
print("[ %s ] %s" % (str(name), str(mess_ok)))
else:
if debug:
print("[ %s ] %s" % (str(name), str(mess_failed)))
return i[1]
def echo_to_file(self, file, cmd):
"""
追加echo结果到文件
:param file: 例如: /etc/sysctl.conf
:param cmd: 例如: echo 123
:return:
"""
os.system("echo {0} | sudo -S pwd".format(self.password))
cmd = str("{0} | sudo tee -a {1}".format(cmd, file))
self.shell(cmd=cmd, terminal=False)
def add_path(self, paths):
"""
追加path路径到用户变量
:param paths: 需要追加的path
:return:
"""
path_list = getenv("PATH").lower().split(":")
if str(paths).lower() in path_list:
self.logger.debug("Path env is Exists")
return False
else:
print("Add [ %s ] To PATH" % paths)
file = path.join(self.home, '.bashrc')
try:
w = open(file=file, mode='w+', encoding='utf8')
txt_path = str("\nexport PATH=${PATH}:%s" % paths)
w.write(txt_path)
w.close()
return True
except Exception as e:
self.logger.error("add_path: %s " % str(e))
return False
class AutoDisk:
def __init__(self, fm: str, dev: str, size: int, p: str, start="0%", stop="100%"):
"""
:param fm: 格式名称
:param dev: 设备
:param size: 大小
:param p: 挂载路径
:param start: 分区起始值
:param stop: 分区结束值
"""
self.stop = stop
self.start = start
self.mount_path = p
self.size = size
# self.su = su
self.dev = dev
self.fm = fm
self.logger = ColorLogger(class_name=self.__class__.__name__)
# 设置分区表格式
self.partition_table = "gpt"
# 设置分区,例如: /dev/vdb1
self.partitions = None
def bak(self):
"""
备份配置文件
:return:
"""
bak = "/etc/bak_fstab"
if os.path.isfile(bak):
self.logger.debug("已备份配置")
return
if os.system("cp -rf /etc/fstab {0}".format(bak)) == 0:
self.logger.info("配置文件备份成功")
else:
self.logger.error("配置文件备份失败")
sys.exit(2)
def check_format(self):
"""
检查分区格式是否标准
:return:
"""
f = str("""ext2
ext3
ext4
xfs
Btrfs
NTFS
FAT32
vfat""").split("\n")
self.fm = str(self.fm).replace(" ", "").replace("\n", "")
for i in f:
i = str(i).replace(" ", "")
if self.fm.lower() == i.lower():
return True
return False
def check_mount(self):
"""
检查目标路径是否已经存在
:return:
"""
if os.path.exists(self.mount_path):
self.logger.warning("文件路径已存在:", self.mount_path)
return False
os.system("mkdir -p {0}".format(self.mount_path))
def partition(self):
"""
开始分区
:return:
"""
global arg_id
c = "parted -s {0} mklabel {1}".format(self.dev, self.partition_table)
m = {"创建分区表格式": c}
# self.logger.debug("创建分区表格式")
# print(c)
c = "parted -s {0} mkpart primary {1} {2} {3}".format(self.dev, self.fm, self.start, self.stop)
m["创建分区"] = c
# self.logger.debug("创建分区")
# print(c)
self.partitions = self.dev + str(arg_id)
c = "mkfs.{0} {1}".format(self.fm, self.partitions)
if arg_force:
c = "mkfs.{0} -f {1}".format(self.fm, self.partitions)
m["格式化分区"] = c
# self.logger.debug("格式化分区")
# print(c)
for n in m:
self.logger.debug(m[n])
if os.system(m[n]) == 0:
self.logger.info("{0} 成功".format(n))
else:
self.logger.error("{0} 失败".format(n))
e = input("是否终止[y/n]")
if str(e).lower() == "y".lower():
sys.exit(1)
def mount(self):
"""
挂载分区
:return:
"""
c = "mount {0} {1}".format(self.partitions, self.mount_path)
self.logger.debug("开始挂载")
print(c)
self.logger.debug("开始检测挂载")
if os.system(c) == 0:
self.logger.info("挂载成功")
else:
self.logger.warning("挂载配置错误")
sys.exit(2)
def reset(self):
"""
恢复配置文件
:return:
"""
c = f"cp -rf /etc/bak_fstab /etc/fstab"
if os.system(c) == 0:
self.logger.info("配置文件重置成功")
else:
self.logger.error("配置文件重置失败")
def add(self):
"""
添加配置
:return:
"""
if arg_reset:
self.reset()
c = "blkid %s" % self.partitions
self.logger.debug("UUID获取命令")
print(c)
g = getstatusoutput(c)
if g[0] != 0:
self.logger.error("UUID获取失败")
sys.exit(4)
uuid = str(g[1]).replace('"', "'").split(" ")[1]
self.logger.debug("获取结果")
print(uuid)
conf = "{0}\t{1}\t{2}\tdefaults\t0\t0\n".format(uuid, self.mount_path, self.fm)
self.logger.debug("配置参数如下")
print(conf)
try:
with open(file="/etc/fstab", mode="a+", encoding="utf8") as w:
w.write(conf)
w.close()
except Exception as e:
w.close()
self.reset()
self.logger.error("配置写入失败")
self.logger.error(str(e))
def check_root(self):
"""
检查用户权限
:return:
"""
if getoutput("echo $USER") == "root".lower():
self.logger.info("用户权限检验通过")
else:
self.logger.error("请使用ROOT用户操作")
sys.exit(2)
def check_conf(self):
"""
检查配置
:return:
"""
o = getoutput("mount -a")
if len(o) >= 2:
self.logger.error("配置异常")
else:
self.logger.info("暂未检测到异常")
def handel(self):
"""
开始处理
:return:
"""
self.check_root()
self.bak()
self.check_format()
self.check_mount()
self.partition()
self.mount()
self.add()
self.check_conf()
if __name__ == "__main__":
arg = ArgumentParser(description='AutoDisk')
arg.add_argument('-d', '--dev', type=str, help='设置需要分区的设备,例如: sdb', required=True)
arg.add_argument('-f', '--fm', type=str, default="xfs", help='设置文件格式,默认: xfs', required=False)
arg.add_argument('-p', '--path', type=str, help='设置挂载路径,默认: /mnt', default="/mnt", required=False)
arg.add_argument('-i', '--id', type=int, help='设置分区号,默认: 1 ,例如: /dev/vdb1 ', default=1, required=False)
arg.add_argument('-pt', '--partition_table', type=str, help="设置分区表类型,默认: gpt", default="gpt",
required=False)
arg.add_argument('-u', '--unit', type=str, default="GB", help='设置分区大小单位(当传入size参数有效),默认: GB',
required=False)
arg.add_argument('-start', '--start', type=str, default="0%", help='设置分区开始位置,默认: 0', required=False)
arg.add_argument('-stop', '--stop', type=str, default="100%", help='设置分区结束位置,默认: 100(也就是百分百)',
required=False)
arg.add_argument('-s', '--size', type=int, help='设置分区大小(默认使用起始位置控制大小)', required=False)
arg.add_argument('-c', '--cover', action="store_true", help='覆盖路径(当挂载路径已经存在的情况下直接覆盖挂载)')
arg.add_argument('--force', action="store_true", help='是否强制格式化,慎用')
arg.add_argument('--reset', action="store_true", help='是否使用初始配置文件')
args = arg.parse_args()
arg_dev = args.dev
arg_dev = os.path.join("/dev", arg_dev)
if not os.path.exists(arg_dev):
print("设备不存在: ", arg_dev)
sys.exit(4)
arg_format = args.fm
arg_path = args.path
arg_pt = args.partition_table
arg_size = args.size
arg_unit = args.unit
arg_start = args.start
arg_stop = args.stop
arg_cover = args.cover
arg_id = args.id
arg_force = args.force
arg_reset = args.reset
fd = AutoDisk(fm=arg_format, dev=arg_dev, size=arg_size, p=arg_path, start=arg_start, stop=arg_stop)
fd.handel()