2025py重学计划
Day 1
Day 1 Python 语法入门 变量、print、类型、输入输出 写一个 CLI 工具,输入姓名打印问候语
写一个 CLI 工具,输入姓名打印问候语
v1 cat echo.py
name = 'evan'
print( name + " have a good day")
v2
➜ py2025 cat echo.py
name = input("please input your name")
print( name + " have a good day")
py echo.py
please input your name evan
evan have a good day
➜ py2025 cat echo.py
v3
import sys
if len(sys.argv) < 2:
print("Usage: python echo.py <name>")
else:
name = sys.argv[1]
print(f"Hi, {name} have a good day")
➜ py2025 py echo.py
Usage: python echo.py <name>
➜ py2025 py echo.py evan
Hi, evan have a good d
v4
➜ py2025 cat greet.py
import argparse
def main():
parser = argparse.ArgumentParser(description="Say Hi to someone")
parser.add_argument("-n","--name",required=True,help="The name of the person to greet.")
args = parser.parse_args()
print(f"Hello,{args.name} hope you have a good day")
if __name__ == "__main__":
main()
➜ py2025 python greet.py -n evan
Hello,evan hope you have a good day
day2
Day 2 流程控制 if, for, while, break, continue 判断磁盘使用率是否超过阈值
num = [1,2,3,4,5,6,]
for nu in num:
if nu % 2 == 0:
print(f"{num} 偶数")
continue
elif nu == 5:
print(f"找到 5 quit loop")
break
print(f"{num} 奇数")
day3
Day 3 数据结构 list, dict, set, tuple 常用操作 把 log 中的错误代码用 dict 统计次数
py2025 cat errors.log
ERR001
ERR002
ERR001
ERR003
ERR002
ERR001
➜ py2025 py 1.py
ERR001: 3次
ERR002: 2次
ERR003: 1次
➜ py2025 tail 1.py
error_counts = {}
with open("errors.log","r") as file:
for line in file:
error_code = line.strip()
error_counts[error_code] = error_counts.get(error_code,0) + 1
for code,count in error_counts.items():
print(f"{code}: {count}次")
这ai 解说还真的有意思
for code, count in error_counts.items(): 这一行:
error_counts 是一个字典,用于存储错误代码及其出现的次数。例如,error_counts 可能是 {"E001": 3, "E002": 2} 这样的形式。
items() 是字典的一个方法,它会返回一个包含多个元组的可迭代对象,每个元组由字典的键和对应的值组成。对于 {"E001": 3, "E002": 2} 这个字典,error_counts.items() 的结果就是 [("E001", 3), ("E002", 2)] 这样的形式。
for code, count in error_counts.items(): 是一个循环语句,code 和 count 是自定义的变量名。在每次循环中,code 会被赋值为字典的键(也就是错误代码,如 "E001"),count 会被赋值为对应的值(也就是该错误代码出现的次数,如 3)。也就是说,第一次循环时,code 是 "E001",count 是 3;第二次循环时,code 是 "E002",count 是 2。
print(f"{code}: {count} 次") 这一行:
print() 是 Python 内置的函数,用于在控制台输出信息。
f"{code}: {count} 次" 是一个格式化字符串(也称为 f-string)。在这个字符串中,{code} 和 {count} 是占位符,会被 code 和 count 变量的值所替换。例如,当 code 是 "E001",count 是 3 时,f"{code}: {count} 次" 就会被解析为 "E001: 3 次",然后 print() 函数会将这个字符串输出到控制台。
第 7 行:error_code = line.strip()
strip() 方法移除字符串首尾的空白字符(如换行符 \n、空格)。
示例:
"E001\n".strip() → "E001"
" E002 ".strip() → "E002"
第 9 行:error_counts[error_code] = error_counts.get(error_code, 0) + 1
核心逻辑:统计错误代码的出现次数。
get(key, default) 方法:
如果 key 存在于字典中,返回对应的值。
如果 key 不存在,返回默认值 default(这里是 0)。
执行过程:
第一次遇到 "E001":
error_counts.get("E001", 0) 返回 0
0 + 1 = 1,因此 error_counts["E001"] = 1
第二次遇到 "E001":
error_counts.get("E001", 0) 返回 1
1 + 1 = 2,因此 error_counts["E001"] = 2
day 4
Day 4 函数与作用域 定义函数、传参、返回值、局部变量 封装一个“计算服务器负载”的函数
def calculate_server_load(cpu_usage,memory_used,disk_io, total_memory=32,max_io=32):
"""
計算伺服器負載。
參數:
cpu_usage (float): CPU 使用率(%)
memory_used (float): 記憶體使用量(GB)
disk_io (float): 磁碟 I/O 速率(MB/s)
total_memory (float): 總記憶體(GB,預設 32)
max_io (float): 最大 I/O 速率(MB/s,預設 200)
返回:
float: 伺服器負載(%)
"""
if not all(isinstance(x,(int,float)) for x in [cpu_usage,memory_used, disk_io, total_memory,max_io]):
raise ValueError("參數必須為數字")
if cpu_usage < 0 or memory_used < 0 or disk_io < 0 or total_memory <= 0 or max_io <= 0:
raise ValueError("參數不能為負數,且總記憶體與最大 I/O 不能為 0 ")
if cpu_usage > 100:
raise ValueError("CPU 使用率不能大於 100%")
if memory_used > total_memory:
raise ValueError("記憶體使用量不能大於總記憶體")
cpu_load = cpu_usage * 0.4
memory_load = (memory_used / total_memory * 100) * 0.4
io_load = (disk_io / max_io * 100) * 0.2
total_load = cpu_load + memory_load + io_load
return round(total_load,2)
try:
load = calculate_server_load(80,20,100,32,200)
print(f"伺服器負載為 {load}%")
except ValueError as e:
print(f"err{e}")
day 5
Day 5 文件操作 with open, 读写、逐行处理 写一个“读取Nginx日志并分析IP访问量”的脚本
from collections import Counter
import sys
if len(sys.argv) < 2:
print("Usage: python nglog.py log_file")
sys.exit(1)
log_file = sys.argv[1]
ip_counter = Counter()
try:
with open(log_file,'r') as file:
for line in file:
if line.strip():
ip = line.lstrip().split(' ',1)[0]
ip_counter[ip] += 1
except FileNotFoundError:
print(f"Error: file '{log_file}' not found.")
sys.exit(1)
print("IP 访问量统计")
for ip, count in ip_counter.most_common():
print(f"IP: {ip}, Accesses: {count}")
# IP 地址提取:
# line.lstrip().split(' ', 1)[0]:
# lstrip() 去除行首空白字符
# split(' ', 1) 按第一个空格分割字符串
# [0] 取分割后的第一个部分(即 IP 地址)
# cat access.log
# 192.168.1.1 - - [01/Jan/2023:00:00:00 +0000] "GET /index.html HTTP/1.1"
# python nglog.py access.log
# IP 访问量统计
# IP: 192.168.1.1, Accesses: 2
# IP: 192.168.1.2, Accesses: 1
for ip, count in ip_counter.most_common():
ip_counter.most_common() 的作用
ip_counter 是一个 Counter 对象,它存储了每个 IP 地址的出现次数。
例如:Counter({'192.168.1.1': 5, '192.168.1.2': 3, '10.0.0.1': 2})
most_common() 是 Counter 的方法,用于按计数从高到低排序元素。
返回值:一个包含元组的列表,每个元组格式为 (元素, 计数)。
例如:[('192.168.1.1', 5), ('192.168.1.2', 3), ('10.0.0.1', 2)]
循环变量解构
for ip, count in ...:
ip:每次循环中存储当前 IP 地址(元组的第一个元素)。
count:存储该 IP 的访问次数(元组的第二个元素)。
第 3 行:print(f"IP: {ip}, Accesses: {count}")
f-string 格式化输出
语法:f"字符串{变量}" 会将 {} 内的变量值插入字符串。
示例:
当 ip = '192.168.1.1' 且 count = 5 时,输出:
plaintext
IP: 192.168.1.1, Accesses: 5
完整执行流程示例
假设 ip_counter 的内容为:
python
运行
Counter({'192.168.1.1': 3, '10.0.0.1': 2, '192.168.1.100': 1})
第一次循环:
ip = '192.168.1.1',count = 3
输出:IP: 192.168.1.1, Accesses: 3
第二次循环:
ip = '10.0.0.1',count = 2
输出:IP: 10.0.0.1, Accesses: 2
第三次循环:
ip = '192.168.1.100',count = 1
输出:IP: 192.168.1.100, Accesses: 1
为什么用 most_common()?
默认行为:most_common() 不指定参数时,会返回所有元素,并按计数从高到低排序。
等价写法:
python
运行
# 手动排序(效果相同)
for ip, count in sorted(ip_counter.items(), key=lambda x: x[1], reverse=True):
print(f"IP: {ip}, Accesses: {count}")
最终输出示例
plaintext
IP 访问量统计
IP: 192.168.1.1, Accesses: 3
IP: 10.0.0.1, Accesses: 2
IP: 192.168.1.100, Accesses: 1
这两行代码的核心作用是将统计结果以易读的格式展示出来,并按访问量从高到低排序,帮助用户快速定位高频访问的 IP 地址。
day 6
Day 6 错误处理 try...except...finally 模拟连接失败的错误处理
import time
def connect_to_resource(url,max_retries=3):
retries = 0
while retries < max_retries:
try:
print(f"尝试连接到 {url}, 第 {retries+1} 次尝试")
should_fail = (retries < 2)
if should_fail:
raise Exception("Failed to connect to resource")
else:
print("连接成功 {url}")
return True
except Exception as e:
print(f"连接失败 {url}, 错误信息: {e}")
retries += 1
print(f"等 2 秒后重试")
time.sleep(1)
finally:
print("无论连接成功与不,这里都会执行清理操作")
print("-" * 20)
print(f"尝试了 {max_retries} 次,连接失败")
return False
if __name__ == "__main__":
resource_url = "https://wiki.linuxchina.net"
connection_successful = connect_to_resource(resource_url)
if connection_successful:
print("连接成功")
else:
print("连接失败")
尝试连接到 https://wiki.linuxchina.net, 第 1 次尝试
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
等 2 秒后重试
无论连接成功与不,这里都会执行清理操作
--------------------
尝试连接到 https://wiki.linuxchina.net, 第 2 次尝试
连接失败 https://wiki.linuxchina.net, 错误信息: Failed to connect to resource
等 2 秒后重试
无论连接成功与不,这里都会执行清理操作
--------------------
尝试连接到 https://wiki.linuxchina.net, 第 3 次尝试
连接成功 {url}
无论连接成功与不,这里都会执行清理操作
--------------------
连接成功
day 7
Day 7 模块与包 import, sys.path, 自定义模块 把昨天的脚本封装为一个模块
day 8
Day 8 subprocess 与 os 执行 shell 命令,获取结果 写一个“批量检测服务状态”的脚本
socket version 利用python检测远程 IP和端口是否可连接并钉钉报警
nc version
import subprocess
def check_port(host,port):
try:
result = subprocess.run(
['nc','-zv',host,str(port)],
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT,
timeout = 3,
text=True
)
if "succeeded" in result.stdout or "open" in result.stdout:
return f"{host}:{port} is UP"
else:
return f"{host}:{port} is DOWN"
except subprocess.TimeoutExpired:
return f"{host}:{port} Timeout"
except Exception as e:
return f"{host}:{port} Error - {str(e)}"
if __name__ =='__main__':
targets = [
('127.0.0.1',22),
('8.8.8.8',53),
('www.bing.com',80),
]
for host, port in targets:
print(check_port(host,port))
"""
py check_port.py
127.0.0.1:22 is UP
8.8.8.8:53 is UP
www.bing.com:80 is UP
socket version and telnet version
"""
telnet version
import telnetlib
import socket
def check_port(host,port,timeout=3):
try:
with telnetlib.Telnet(host,port,timeout):
return True
except socket.timeout:
return False
except Exception:
return False
if __name__ == '__main__':
targets = [
("127.0.0.1",22),
("linuxsa.org",80)
]
for host,port in targets:
status = "UP" if check_port(host,port) else "DOWN"
print(f"{host}:{port} is {status}")
telnet 报警版
import telnetlib
import socket
import yaml
import requests
import json
from datetime import datetime
DING_WEBHOOK ='https://oapi.dingtalk.com/robot/send?access_token=4578bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73d'
def load_from_yaml(file_path):
with open(file_path,'r') as f:
data = yaml.safe_load(f)
return data.get("targets",[])
def send_ding_alert(message):
headers = {'Content-Type': 'application/json; charset=utf-8'}
data = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(DING_WEBHOOK, headers=headers, data=json.dumps(data),timeout=5)
except Exception as e:
print(f"Failed to send ding alert: {e}")
def check_port(host,port,timeout=3):
try:
with telnetlib.Telnet(host,port,timeout):
return True
except socket.timeout:
return False
except Exception:
return False
def main():
ips = load_from_yaml("/home/evan/data/tmp/py2025/ips.yaml")
down_list=[]
for item in ips:
host = item.get("host")
port = item.get("port")
if not check_port(host,port):
down_list.append(f"{host}:{port}")
if down_list:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"[{now}] Port detection anomaly端口检测异常 {now} 报警: {', '.join(down_list)} 🚨"
send_ding_alert(message)
else:
print("All ports are up")
if __name__ == '__main__':
main()
ips.yaml
targets:
- host: 127.0.0.1
port: 24
- host: 8.8.8.8
port: 53
- host: www.bing.com
port: 89
day 9
Day 9 requests 库 API 请求、返回解析、header/cookie 用钉钉/企业微信接口推送消息
pre: 都是图形操作,你事先有个钉钉群,没有的话创建一个 进入钉钉群 → 添加一个「自定义机器人」
设置关键词(如:报警)
复制 Webhook 地址(例子如下)
import requests
import json
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=6678bcd63ad05f3a315ef4f971c70efe9990ef0adc47f804c3273371a73dc155'
message_text = "报警: 服务器磁盘使用率超过阈值!🚨- test message"
headers = {
"Content-Type": "application/json",
}
payload = {
"msgtype": "text",
"text": {
"content":message_text
}
}
response = requests.post(url=webhook_url,headers=headers, data=json.dumps(payload))
if response.status_code == 200:
result = response.json()
if result.get("errcode") == 0:
print("message sent successfully")
else:
print("message sent failed, error code: ", result)
else:
print("HTTP err message sent failed, status code: ", response.status_code)
Day 10
paramiko + fabric SSH 自动执行远程命令 写一个“批量部署脚本”上传+执行
from fabric import Connection
from invoke import Responder
servers = [
{"host": "192.168.10.5", "user": "root", "password": "xxxxxxx"},
{"host": "192.168.1.101", "user": "root", "password": "123456"},
]
local_script = "test36.sh"
remote_path = "/tmp/test36.sh"
remote_cmd = f"bash {remote_path}"
for server in servers:
print(f"uploading script to {server['host']}")
conn = Connection(
host=server['host'],
user=server['user'],
connect_kwargs={"password": server['password']}
)
print(f"uploading {local_script} to {remote_path}")
conn.put(local_script, remote_path)
conn.run(f"chmod +x {remote_path}")
print("running remote script")
result = conn.run(remote_cmd,hide=True)
print(result.stdout)
conn.close()
print(f"Done with {server['host']}")
# cat /tmp/test36.sh
#!/bin/bash
echo "Deploying service..."
# 示例命令,可改为拉 Git、重启服务等
systemctl restart nginx
trouble shooting
from .loader import FilesystemLoader # noqa
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3/dist-packages/invoke/loader.py", line 3, in <module>
import imp
ModuleNotFoundError: No module named 'imp'
你使用的 Fabric 版本太老,不兼容你当前的 Python 版本(Python 3.12+ 已经移除了 imp 模块)。
pip3 install --upgrade fabric invoke
Day 11
argparse 与定时任务 解析命令参数、结合 crontab 写一个“CLI磁盘告警工具”定时运行
Day 12
JSON/YAML 配置文件 json, yaml,配置与数据保存 写一个可配置的“日志清理工具”
import os
import glob
import time
import argparse
import json
import yaml
from datetime import datetime,timedelta
def load_config(file_path):
with open(file_path, 'r') as f:
if file_path.endswith('.json'):
return json.load(f)
elif file_path.endswith('.yaml') or file_path.endswith('.yml'):
return yaml.safe_load(f)
else:
raise ValueError('Unsupported file type')
def clean_logs(path,pattern,keep_days):
now = time.time()
cutoff = now - (keep_days * 24 * 60 * 60)
full_patern = os.path.join(path,pattern)
deleted = []
for file in glob.glob(full_patern):
if os.path.isfile(file):
file_mtime = os.path.getmtime(file)
if file_mtime < cutoff:
os.remove(file)
deleted.append(file)
return deleted
def main():
#创建一个命令行解析器对象 parser description 是给这个工具添加一个简短说明,在 --help 里会看到
parser = argparse.ArgumentParser(description="clean log files")
#添加一个参数 --config(必须提供,required=True)
parser.add_argument('--config',required=True,help="配置文件路径 (.json/.yaml)")
args = parser.parse_args()
config = load_config(args.config)
log_items = config.get("log_paths",[])
for item in log_items:
path = item.get("path")
pattern = item.get("pattern","*.log")
keep_days = item.get("keep_days",7)
deleted = clean_logs(path,pattern,keep_days)
for f in deleted:
print(f"deleted {f}")
if __name__ == "__main__":
main()
# sudo python log_cleaner.py --config config.yaml
# deleted /var/log/nginx/access.log
# deleted /var/log/nginx/error.log
# ➜ py2025
"""
假设你的配置文件写的是:
log_paths:
- path: "/var/log/nginx"
pattern: "*.log"
keep_days: 5
- path: "/var/log/app"
keep_days: 10
这段代码会这样处理:
第一个 item:path="/var/log/nginx",pattern="*.log",keep_days=5
第二个 item:path="/var/log/app",pattern="*.log"(默认值),keep_days=10
"""
Day 13
简易项目实战 综合使用上述技能 “自动部署 + 告警 + 日志备份”工具原型
Day 14
回顾 + 面试准备 简述项目、准备英文表达 准备好一个 Python 项目介绍 + 中英文回答模板
other
import os
import time
log_dir= "/var/log/nginx"
now = time.time()
for filename in os.listdir(log_dir):
if filename.endswith(".log"):
filepath = os.path.join(log_dir,filename)
if os.stat(filepath).st_mtime < now - 3600*24*7: # delete log files older than 7 days
os.remove(filepath)
print(f"Deleted {filepath}")
#shell
#!/bin/bash
#LOG_DIR=$1
LOG_DIR="/var/log/apache2"
find "$LOG_DIR" -name "*.log" -type f -mtime +7 -exec rm -f {} \;