Python和运维
跳转到导航
跳转到搜索
python常用的运维脚本
目录备份打包
cat back.py
import os
import shutil
from datetime import datetime
source_dir = "/tmp/bakdir/"
backup_dir = "/home/evan/tmp/"
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
backup_path =os.path.join(backup_dir,f"backup_{timestamp}.zip")
# or 也可以用 tar.bz2
backup_path =os.path.join(backup_dir,f"backup_{timestamp}.tar.bz2")
shutil.make_archive(backup_path[:-4],'zip',source_dir)
#bz2
shutil.make_archive(backup_path[:-4],'bztar',source_dir)
print(f"Backup created: {backup_path}")
➜ shtpm python3 back.py
Backup created: /home/evan/tmp/backup_20241205164441.zip
不去掉 zip 会比较难看
backup_20241205165030.zip.zip
前
➜ shtpm ls /home/evan/tmp
ls: 无法访问 '/home/evan/tmp': 没有那个文件或目录
后
➜ shtpm ls /home/evan/tmp
backup_20241205164441.zip
试一下chatgpt 不借 随便找个问
backup_path 是一个字符串变量,表示备份文件的路径(例如:/home/evan/tmp/backup_20241205120000.zip)。
[:-4] 表示从字符串开头到倒数第 4 个字符(不包含倒数第 4 个字符)的部分。
在这个例子中,-4 对应的是 .zip 的开始索引,因此会去掉 .zip
Python 3 面向对象来监控 Linux 上 Nginx 进程的脚本
import psutil
class NginxMonitor:
def __init__(self):
self.nginx_processes = []
def find_nginx_processes(self):
for proc in psutil.process_iter():
try:
if 'nginx' in proc.name():
self.nginx_processes.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return self.nginx_processes
def print_nginx_process_info(self):
processes = self.find_nginx_processes()
if not processes:
print("No nginx process found")
else:
for proc in processes:
print("Nginx Process ID: {}".format(proc.pid))
print("Parent Process ID: {}".format(proc.ppid))
print("CPU Percent: {}%".format(proc.cpu_percent(interval=1)))
print("Memory Usage: {} bytes".format(proc.memory_info().rss))
print("Status: {}".format(proc.status()))
print("Command Line: {}".format(proc.cmdline()))
if __name__ == '__main__':
monitor = NginxMonitor()
monitor.print_nginx_process_info()
# ➜ tmp py3 ngmonitor.py
# Nginx Process ID: 70189
# Parent Process ID: <bound method Process.ppid of psutil.Process(pid=70189, name='nginx', status='sleeping', started='15:14:38')>
# CPU Percent: 0.0%
# Memory Usage: 1777664 bytes
# Status: sleeping
# Command Line: ['nginx:', 'master', 'process', '/usr/sbin/nginx', '-g', 'daemon', 'on;', 'master_process', 'on;']
# Nginx Process ID: 70190
# Parent Process ID: <bound method Process.ppid of psutil.Process(pid=70190, name='nginx', status='sleeping', started='15:14:38')>
# CPU Percent: 0.0%
# Memory Usage: 5734400 bytes
# Status: sleeping
# Command Line: ['nginx: worker process', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
# Nginx Process ID: 70191
# Parent Process ID: <bound method Process.ppid of psutil.Process(pid=70191, name='nginx', status='sleeping', started='15:14:38')>
# CPU Percent: 0.0%
# Memory Usage: 5734400 bytes
# Status: sleeping
# Command Line: ['nginx: worker process', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
ai说明还真的 6
1. 类的定义
NginxMonitor类:
__init__方法初始化一个空列表nginx_processes,用于存储找到的 Nginx 进程。
2. 方法
find_nginx_processes方法:
通过遍历系统中的所有进程(使用psutil.process_iter())。
对于每个进程,尝试获取其名称,如果名称中包含nginx,则将该进程添加到nginx_processes列表中。
在这个过程中,捕获可能出现的进程不存在、访问被拒绝和僵尸进程等异常情况。
print_nginx_process_info方法:
首先调用find_nginx_processes方法获取 Nginx 进程列表。
如果列表为空,打印提示信息表示没有找到 Nginx 进程。
如果列表不为空,对于每个 Nginx 进程,打印出进程 ID(pid)、父进程 ID(ppid)、CPU 使用率(cpu_percent)、内存使用量(memory_info().rss,这里的rss表示常驻内存集大小)、进程状态(status)和命令行参数(cmdline)。
3. 主程序
在if __name__ == "__main__"部分,实例化NginxMonitor类,并调用print_nginx_process_info方法来打印 Nginx 进程的相关信息。
端口check
import socket
def check_port(port):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex(("localhost", port))
if result == 0:
print(f"Port {port} is open.")
else:
print(f"Port {port} is closed.")
s.close()
except socket.error as e:
print(f"Error: {e}")
if __name__ == "__main__":
check_port(80)
从日志文件中提取所有的错误日志并存储到另一个文件中 方便给开发人员查看
数据处理与脚本 之 Python 脚本,从日志文件中提取所有的错误日志并存储到另一个文件中 方便给开发人员查看。
# errmv.py
def extract_errors(log_file,output_file):
with open(log_file,'r') as infile, open(output_file,'w') as outfile:
for line in infile:
# //if "ERROR" in line:
if "failed" in line:
outfile.write(line)
extract_errors('nglog.log','error.log')
➜ tmp cat nglog.log
2024/09/06 17:42:07 [emerg] 64755#64755: open() "/etc/nginx/sites-enabled/default" failed (2: No such file or directory) in /etc/nginx/nginx.conf:61
2024/09/06 17:44:59 [notice] 65501#65501: using inherited sockets from ""
tmp py3 errmv.py
➜ tmp cat error.log
2024/09/06 17:42:07 [emerg] 64755#64755: open() "/etc/nginx/sites-enabled/default" failed (2: No such file or directory) in /etc/nginx/nginx.conf:61
py3 批量修改文件后缀名 或者去掉后缀名
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# python批量更换后缀名 注意 需要把脚本放在与修改的文件同一目录下,因为filenames只是获取了程序目录的文件名。
import os
# 列出当前目录下所有的文件
files = os.listdir('.')
for filename in files:
portion = os.path.splitext(filename)
# 如果后缀是.dat
if portion[1] == ".txt":
# 这里是把 .txt去掉 重新组合文件名和后缀名
newname = portion[0]
#txt 文件改为sh 要用这个记得打开下行注释
#newname = portion[0] + ".sh"
#os.rename(filename,newname)
os.replace(filename,newname)
# As of Python version 3.3 and later, it is generally preferred to use os.replace instead of os.rename so FileExistsError is not raised if the destination file already exists.
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# python批量更换后缀名 进入目录的版本 利用os.chdir
import os
import sys
os.chdir(r'~/tmp')
# 列出当前目录下所有的文件
files = os.listdir('./')
print('files',files)
for filename in files:
portion = os.path.splitext(filename)
# 如果后缀是.txt
if portion[1] == ".txt":
# 这里是把 .txt去掉 重新组合文件名和后缀名
newname = portion[0]
#txt 文件改为sh 要用这个记得打开下行注释
#newname = portion[0] + ".sh"
os.rename(filename,newname)
#os.rename(filename,newname)
python批量更换后缀名 会返回脚本所在目录的版本
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# python批量更换后缀名 会返回脚本所在目录的版本
import os
import sys
# Set the path
path ='~'
# save current working directory
saved_cwd=os.getcwd()
# change your cwd to the directory which contains files
os.chdir(path)
# 列出当前目录下所有的文件
files = os.listdir('./')
print('files',files)
for filename in files:
portion = os.path.splitext(filename)
# 如果后缀是.txt
if portion[1] == ".txt":
# 这里是把 .txt去掉 重新组合文件名和后缀名
newname = portion[0]
#txt 文件改为sh 要用这个记得打开下行注释
#newname = portion[0] + ".sh"
os.rename(filename,newname)
#os.rename(filename,newname)
os.chdir(saved_cwd)
# https://stackoverflow.com/questions/2491222/how-to-rename-a-file-using-python#2491232
eg
写个多线程登录,同步文件,执行命令,拿返回结果,记录日志这几个功能
import paramiko
import threading
import logging
import subprocess
import os
from scp import SCPClient
logging.basicConfig(filename='task.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# logging.basicConfig(filename='task.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class RemoteClient:
def __init__(self,host, port, username,password):
self.host=host
self.port = port
self.username = username
self.password = password
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def connect(self):
try:
self.ssh_client.connect(self.host,self.port,self.username, self.password,timeout=10)
logging.info(f"成功连接到 {self.host}")
return True
except Exception as e:
logging.error(f"连接失败 {self.host}: {str(e)}")
return False
def execute_command(self,command):
try:
stdin,stdout,stderr = self.ssh_client.exec_command(command)
ouoput = stdout.read().decode()
error = stderr.read().decode()
logging.info(f"在 {self.host} 执行命令: {command}")
logging.info(f"命令输出: {output}")
if error:
logging.error(f"错误信息: {error}")
return output
except Exception as e:
logging.error(f"执行命令失败 {self.host}: {str(e)}")
return None
def sync_files(self,local_path,remote_path):
try:
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(local_path,remote_path,recursive=True)
logging.info(f"文件同步成功 {local_path} -> {self.host}:{remote_path}")
except Exception as e:
logging.error(f"文件同步失败 {self.host}: {str(e)}")
def close(self):
self.ssh_client.close()
logging.info(f"断开连接 {self.host}")
def task(host,port,username,password,local_path,remote_path,command):
client =RemoteClient(host,port,username,password)
if client.connect():
client.sync_files(local_path,remote_path)
resuslt = client.execute_command(command)
print(f"服务器 {host} 命令执行结果 \n{resuslt}")
client.close()
servers = [
{"host": "192.168.10.5","port":22, "username": "root", "password": "2xxx","local_path": "test.txt","remote_path": "/tmp/test.txt","command": "cat /tmp/test.txt"},
{"host": "192.168.10.6","port":22, "username": "root", "password": "2xxxx","local_path": "test.txt","remote_path": "/root/test.txt","command": "cat /tmp/test.txt"},
]
threads = []
for server in servers:
t = threading.Thread(target=task,args=(
server["host"],server["port"],server["username"],server["password"],
server["local_path"],server["remote_path"],server["command"]
))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有任务执行完成,日志已记录")
python 写个执行命令
v1
import subprocess
result = subprocess.run(["ls","-l"],capture_output=True,text=True)
print(result.stdout)
print(result.stderr)
v2
import subprocess
def run_command(command):
try:
result = subprocess.run(command,shell=True,capture_output=True,text=True,check=True)
return result.stdout
except subprocess.CalledProcessError as e:
return f"Error: {e.stderr}"
output =run_command("ls")
print(output)
py 同步文件
os.path.splitext
os.path.splitext(“文件路径”) 分离文件名与扩展名;默认返回(fname,fextension)元组,可做分片操作
https://www.cnblogs.com/rychh/articles/9745932.html
https://blog.csdn.net/u011509971/article/details/70244688
eg
at getip.py
#!/usr/bin/python3
import requests
print(requests.get('http://ifconfig.me/ip', timeout=1).text.strip())
cat checkng.py
#!/usr/bin/python3
# this on
#checkng.py
import requests
url="http://54.215.65.27"
response = requests.get(url)
if str(response) == '<Response [200]>':
print('请求200,nginx 容器正常')