您当前的位置:首页 > IT编程 > python
| C语言 | Java | VB | VC | python | Android | TensorFlow | C++ | oracle | 学术与代码 | cnn卷积神经网络 | gnn | 图像修复 | Keras | 数据集 | Neo4j | 自然语言处理 | 深度学习 | 医学CAD | 医学影像 | 超参数 | pointnet | pytorch | 异常检测 | Transformers | 情感分类 | 知识图谱 |

自学教程:Python如何实现Paramiko的二次封装

51自学网 2021-10-30 23:02:49
  python
这篇教程Python如何实现Paramiko的二次封装写得很实用,希望能帮到您。

Paramiko是一个用于执行SSH命令的Python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考。

目前本人巡检百台设备完全无压力,如果要巡检过千台则需要多线程的支持,过万台则需要加入智能判断等。

实现命令执行: 直接使用过程化封装,执行CMD命令.

import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())def BatchCMD(address,username,password,port,command):  try:    ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)    stdin , stdout , stderr = ssh.exec_command(command)    result = stdout.read()    if len(result) != 0:      result = str(result).replace("//n", "/n")      result = result.replace("b'", "").replace("'", "")      return result    else:      return None  except Exception:    return None

 实现磁盘巡检: 获取磁盘空间并返回字典格式

def GetAllDiskSpace(address,username,password,port):  ref_dict = {}  cmd_dict = {"Linux/n" : "df | grep -v 'Filesystem' | awk '{print $5 /":/" $6}'",        "AIX/n" : "df | grep -v 'Filesystem' | awk '{print $4 /":/" $7}'"        }  # 首先检测系统版本  os_version = BatchCMD(address,username,password,port,"uname")  for version,run_cmd in cmd_dict.items():    if(version == os_version):      # 根据不同版本选择不同的命令      os_ref = BatchCMD(address,username,password,port,run_cmd)      ref_list= os_ref.split("/n")      # 循环将其转换为字典      for each in ref_list:        # 判断最后是否为空,过滤最后一项        if each != "":          ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])  return ref_dict# 磁盘巡检总函数def DiskMain():  with open("db.json", "r", encoding="utf-8") as read_fp:    load_json = read_fp.read()    js = json.loads(load_json)    base = js.get("base")    count = len(base)    for each in range(0,count):      print("/033[37m-/033[0m" * 80)      print("/033[35m 检测地址: {0:10} /t 用户名: {1:10} /t 密码: {2:10} /t 端口: {3:4}/033[0m".        format(base[each][1],base[each][2],base[each][3],base[each][4]))      print("/033[37m-/033[0m" * 80)      ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4])      for k,v in ref.items():        # 判断是否存在空盘        if( v.split("%")[0] != "-"):          # 将占用百分比转换为整数          space_ret = int(v.split("%")[0])          if space_ret >= 70:            print("/033[31m 磁盘分区: {0:30} /t 磁盘占用: {1:5} /033[0m".format(k,v))            continue          if space_ret >= 50:            print("/033[33m 磁盘分区: {0:30} /t 磁盘占用: {1:5} /033[0m".format(k, v))            continue          else:            print("/033[34m 磁盘分区: {0:30} /t 磁盘占用: {1:5} /033[0m".format(k, v))            continue      print()# 组内传递用户名密码时调用此方法def GroupDiskMain(address,username,password,port):  ref = GetAllDiskSpace(address,username,password,port)  for k, v in ref.items():    if (v.split("%")[0] != "-"):      space_ret = int(v.split("%")[0])      if space_ret >= 70:        print("磁盘分区: {0:30} /t 磁盘占用: {1:5} -> [警告]".format(k, v))        continue      if space_ret >= 50:        print("磁盘分区: {0:30} /t 磁盘占用: {1:5} -> [警惕]".format(k, v))        continue      else:        print("磁盘分区: {0:30} /t 磁盘占用: {1:5} -> [正常]".format(k, v))        continue  print()

获取系统内存利用率: 获取系统内存利用率

def GetAllMemSpace(address,username,password,port):  cmd_dict = {"Linux/n" : "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 /":/" $2}'",        "AIX/n" : "df | grep -v 'Filesystem' | awk '{print $4 /":/" $7}'"        }  # 首先检测系统版本  os_version = BatchCMD(address,username,password,port,"uname")  for version,run_cmd in cmd_dict.items():    if(version == os_version):      # 根据不同版本选择不同的命令      os_ref = BatchCMD(address,username,password,port,run_cmd)      # 首先现将KB转化为MB      mem_total = math.ceil( int(os_ref.split(":")[0].replace("/n","")) / 1024)      mem_free = math.ceil(int(os_ref.split(":")[1].replace("/n","")) / 1024)      mem_used = str( int(mem_total) - int(mem_free))      # 计算占用空间百分比      percentage = 100 - int(mem_free / int(mem_total / 100))      print("内存总计空间: {}".format(str(mem_total) + " MB"))      print("内存剩余空间: {}".format(str(mem_free) + " MB"))      print("内存已用空间: {}".format(str(mem_used) + " MB"))      print("计算百分比: {}".format(str(percentage) + " %"))

获取系统进程信息: 获取系统进程信息,并返回字典格式

def GetAllProcessSpace(address,username,password,port):  ref_dict = {}  cmd_dict = {"Linux/n" : "ps aux | grep -v 'USER' | awk '{print $2 /":/" $11}' | uniq",        "AIX/n" : "ps aux | grep -v 'USER' | awk '{print $2 /":/" $12}' | uniq"        }  os_version = BatchCMD(address,username,password,port,"uname")  for version,run_cmd in cmd_dict.items():    if(version == os_version):      os_ref = BatchCMD(address, username, password, port, run_cmd)      ref_list = os_ref.split("/n")      for each in ref_list:        if each != "":          ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])  return ref_dict# 巡检进程是否存在def ProcessMain():  with open("db.json", "r", encoding="utf-8") as read_fp:    load_json = read_fp.read()    js = json.loads(load_json)    process = js.get("process")    process_count = len(process)    for x in range(0,process_count):      # 根据process中的值查询base中的账号密码      base = js.get("base")      if( list(process[x].keys())[0] == base[x][0] ):        # 拿到账号密码之后再提取出他们的进程ID于进程名        print("/033[37m-/033[0m" * 80)        print("/033[35m 检测地址: {0:10} /t 用户名: {1:10} /t 密码: {2:10} /t 端口: {3:4}/033[0m".           format(base[x][1], base[x][2], base[x][3], base[x][4]))        print("/033[37m-/033[0m" * 80)        ref_dic = GetAllProcessSpace(base[x][1],base[x][2],base[x][3],base[x][4])        # ref_val = 全部进程列表 proc_val = 需要检测的进程列表        ref_val = list(ref_dic.values())        proc_val = list(process[x].values())[0]        # 循环比较是否在列表中        for each in proc_val:          flag = each in ref_val          if(flag == True):            print("/033[34m 进程: {0:50} 状态: √ /033[0m".format(each))          else:            print("/033[31m 进程: {0:50} 状态: × /033[0m".format(each))

实现剧本运行功能: 针对特定一台主机运行剧本功能,随便写的一个版本,仅供参考

def RunRule(address,username,password,port,playbook):  os_version = BatchCMD(address,username,password,port,"uname")  if(os_version == list(playbook.keys())[0]):    play = list(playbook.values())[0]    print()    print("/033[37m-/033[0m" * 130)    print("/033[35m 系统类型: {0:4} /t 地址: {1:10} /t 用户名: {2:10} /t 密码: {3:15} /t 端口: {4:4}/033[0m"       .format(os_version.replace("/n",""),address,username,password,port))    print("/033[37m-/033[0m" * 130)    for each in range(0,len(play)):      RunCmd = play[each] + " > /dev/null 2>&1 && echo $?"      print("/033[30m [>] 派发命令: {0:100} /t 状态: {1:5} /033[0m".format(        RunCmd.replace(" > /dev/null 2>&1 && echo $?", ""),"正在派发"))      os_ref = BatchCMD(address, username, password, port, RunCmd)      if(os_ref == "0/n"):        print("/033[34m [√] 运行命令: {0:100} /t 状态: {1:5} /033[0m".format(          RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发完成"))      else:        print("/033[31m [×] 运行命令: {0:100} /t 状态: {1:5} /033[0m".format(          RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发失败"))        # 既然失败了,就把剩下的也打出来吧,按照失败处理        for x in range(each+1,len(play)):          print("/033[31m [×] 运行命令: {0:100} /t 状态: {1:5} /033[0m".format(            play[x].replace(" > /dev/null 2>&1 && echo $?", ""), "终止执行"))        break  else:    return 0# 批量: 传入主机组不同主机执行不同剧本def RunPlayBook(HostList,PlayBook):  count = len(HostList)  error = []  success = []  for each in range(0,count):    ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook)    if ref == 0:      error.append(HostList[each][0])    else:      success.append(HostList[each][0])  print("/n/n")  print("-" * 130)  print("执行清单")  print("-" * 130)  for each in success:    print("成功主机: {}".format(each))  for each in error:    print("失败主机: {}".format(each))# 运行测试def PlayBookRun():  playbook = /    {      "Linux/n":        [          "ifconfig",          "vmstat",          "ls",          "netstat -an",          "ifconfis",          "cat /etc/passwd | grep 'root' | awk '{print $2}'"        ]    }  addr_list = /    [      ["192.168.1.127", "root", "1233", "22"],      ["192.168.1.126", "root", "1203", "22"]    ]  # 指定addr_list这几台机器执行playbook剧本  RunPlayBook(addr_list,playbook)

过程化实现文件上传下载: 文件传输功能 PUT上传 GET下载

def BatchSFTP(address,username,password,port,soruce,target,flag):  transport = paramiko.Transport((address, int(port)))  transport.connect(username=username, password=password)  sftp = paramiko.SFTPClient.from_transport(transport)  if flag == "PUT":    try:      ret = sftp.put(soruce, target)      if ret !="":        transport.close()        return 1      else:        transport.close()        return 0      transport.close()    except Exception:      transport.close()      return 0  elif flag == "GET":    try:      target = str(address + "_" + target)      os.chdir("./recv_file")      ret = sftp.get(soruce, target)      if ret != "":        transport.close()        return 1      else:        transport.close()        return 0      transport.close()    except Exception:      transport.close()      return 0# 批量将本地文件 source 上传到目标 target 中def PutRemoteFile(source,target):  with open("db.json", "r", encoding="utf-8") as fp:    load_json = fp.read()    js = json.loads(load_json)    base = js.get("base")    print("-" * 130)    print("接收主机 /t/t 登录用户 /t 登录密码 /t 登录端口 /t 本地文件 /t/t 传输到 /t/t/t 传输状态")    print("-" * 130)    for each in range(0,len(base)):      # 先判断主机是否可通信      ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4],"uname")      if ref == None:        print("/033[31m{0:15} /t {1:6} /t {2:10} /t {3:3} /t {4:10} /t {5:10} /t 未连通/033[0m".format(          base[each][1],base[each][2],base[each][3],base[each][4],source,target))        continue      ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"PUT")      if(ref == 1):        print("/033[34m{0:15} /t {1:6} /t {2:10} /t {3:3} /t {4:10} /t {5:10} /t 传输成功/033[0m".format(          base[each][1],base[each][2],base[each][3],base[each][4],source,target))      else:        print("/033[31m{0:15} /t {1:6} /t {2:10} /t {3:3} /t {4:10} /t {5:10} /t 传输失败/033[0m".format(          base[each][1], base[each][2], base[each][3], base[each][4], source, target))# 批量将目标文件拉取到本地特定目录(存在缺陷)def GetRemoteFile(source,target):  with open("db.json", "r", encoding="utf-8") as fp:    load_json = fp.read()    js = json.loads(load_json)    base = js.get("base")    print("-" * 130)    print("发送主机 /t/t 登录用户 /t 登录密码 /t 登录端口 /t/t 远程文件 /t/t 拉取到 /t/t/t 传输状态")    print("-" * 130)    for each in range(0,len(base)):      ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4], "uname")      if ref == None:        print("/033[31m{0:15} /t {1:6} /t {2:10} /t {3:3} /t {4:10} /t {5:10} /t 未连通/033[0m".format(          base[each][1], base[each][2], base[each][3], base[each][4], source, target))        continue      ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"GET")      if(ref == 1):        print("/033[34m{0:15} /t {1:6} /t {2:10} /t {3:3} /t {4:10} /t {5:10} /t 传输成功/033[0m".format(          base[each][1],base[each][2],base[each][3],base[each][4],source,target))      else:        print("/033[31m{0:15} /t {1:6} /t {2:10} /t {3:3} /t {4:10} /t {5:10} /t 传输失败/033[0m".format(          base[each][1], base[each][2], base[each][3], base[each][4], source, target))

另一种命令执行方法:

import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())def BatchCMD(address,username,password,port,command):  try:    ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)    stdin , stdout , stderr = ssh.exec_command(command)    result = stdout.read()    if len(result) != 0:      return result    else:      return -1  except Exception:    return -1# 通过获取主机Ping状态def GetPing():  fp = open("unix_base.db", "r", encoding="utf-8")  count = len(open("unix_base.db", "r", encoding="utf-8").readlines())  print("-" * 100)  print("{0:20} /t {1:10} /t {2:13} /t {3:5} /t {4:9} /t {5:40}".format("IP地址","机器系统","设备SN","机房位置","存活状态","主机作用"))  print("-" * 100)  for each in range(count):    ref = eval(fp.readline())    ret = BatchCMD(ref[0],ref[5],ref[6],22,"pwd | echo $?")    if(int(ret)==0):      print("{0:20} /t {1:10} /t {2:11} /t {3:5} /t {4:9} /t {5:40}".         format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4]))    else:      print("{0:20} /t {1:10} /t {2:13} /t {3:5} /t {4:9} /t {5:40}".         format(ref[0],ref[1],ref[2],ref[3],"异常",ref[4]))  fp.close()# ps aux | grep "usbCfgDev" | grep -v "grep" | awk {'print $2'}def GetProcessStatus():  fp = open("unix_process.db", "r", encoding="utf-8")  count = len(open("unix_process.db", "r", encoding="utf-8").readlines())  for each in range(count):    proc = eval(fp.readline())    proc_len = len(proc)    print("-" * 70)    print("---> 巡检地址: {0:10} /t 登录用户: {1:7} /t 登录密码: {2:10}".format(proc[0],proc[1],proc[2]))    print("-" * 70)    for process in range(3, proc_len):      command = "ps aux | grep /'{}/' | grep -v /'grep/' | awk '{}' | head -1".format(proc[process],"{print $2}")      try:        ref = BatchCMD(proc[0],proc[1],proc[2],22,command)        if(int(ref)!=-1):          print("进程: {0:18} /t PID: {1:10} /t 状态: {2}".format(proc[process], int(ref),"√"))        else:          print("进程: {0:18} /t PID:{1:10} /t 状态: {2}".format(proc[process], 0,"×"))      except Exception:        print("进程: {0:18} /t PID:{1:10} /t 状态: {2}".format(proc[process], 0,"×"))    print()  fp.close()def GetDiskStatus():  fp = open("unix_disk.db", "r", encoding="utf-8")  count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())  for each in range(count):    proc = eval(fp.readline())    proc_len = len(proc)    print("-" * 100)    print("---> 巡检地址: {0:10} /t 登录系统: {1:7} /t 登录账号: {2:10} 登录密码: {3:10}".       format(proc[0],proc[1],proc[2],proc[3]))    print("-" * 100)    try:      ref = BatchCMD(proc[0], proc[2], proc[3], 22, "df | grep -v 'Filesystem'")      st = str(ref).replace("//n", "/n")      print(st.replace("b'", "").replace("'", ""))    except Exception:      pass    print()  fp.close()# 运行命令def RunCmd(command,system):  fp = open("unix_disk.db", "r", encoding="utf-8")  count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())  for each in range(count):    proc = eval(fp.readline())    proc_len = len(proc)    if proc[1] == system:      print("-" * 100)      print("---> 巡检地址: {0:10} /t 登录系统: {1:7} /t 登录账号: {2:10} 登录密码: {3:10}".         format(proc[0],proc[1],proc[2],proc[3]))      print("-" * 100)      try:        ref = BatchCMD(proc[0], proc[2], proc[3], 22, command)        st = str(ref).replace("//n", "/n")        print(st.replace("b'", "").replace("'", ""))      except Exception:        pass  fp.close()

面向对象的封装方法: 使用面向对象封装,可极大的提高复用性。

import paramikoclass MySSH:  def __init__(self,address,username,password,default_port = 22):    self.address = address    self.default_port = default_port    self.username = username    self.password = password    self.obj = paramiko.SSHClient()    self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())    self.obj.connect(self.address,self.default_port,self.username,self.password)    self.objsftp = self.obj.open_sftp()  def BatchCMD(self,command):    stdin , stdout , stderr = self.obj.exec_command(command)    result = stdout.read()    if len(result) != 0:      result = str(result).replace("//n", "/n")      result = result.replace("b'", "").replace("'", "")      return result    else:      return None  def GetRemoteFile(self,remotepath,localpath):    self.objsftp.get(remotepath,localpath)  def PutLocalFile(self,localpath,remotepath):    self.objsftp.put(localpath,remotepath)  def GetFileSize(self,file_path):    ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")    return ref  def CloseSSH(self):    self.objsftp.close()    self.obj.close()if __name__ == '__main__':  ssh = MySSH('192.168.191.3','root','1233',22)  ref = ssh.BatchCMD("ifconfig")  print(ref)  sz = ssh.GetFileSize("/etc/passwd")  print(sz)  ssh.CloseSSH()第二次封装完善。import paramiko,os,json,reclass MySSH:  def __init__(self,address,username,password,default_port = 22):    self.address = address    self.default_port = default_port    self.username = username    self.password = password    try:      self.obj = paramiko.SSHClient()      self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())      self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False)      self.objsftp = self.obj.open_sftp()    except Exception:      pass  def BatchCMD(self,command):    try:      stdin , stdout , stderr = self.obj.exec_command(command,timeout=3)      result = stdout.read()      if len(result) != 0:        result = str(result).replace("//n", "/n")        result = result.replace("b'", "").replace("'", "")        return result      else:        return None    except Exception:      return None  def GetRemoteFile(self,remote_path,local_path):    try:      self.objsftp.get(remote_path,local_path)      return True    except Exception:      return False  def PutLocalFile(self,localpath,remotepath):    try:      self.objsftp.put(localpath,remotepath)      return True    except Exception:      return False  def CloseSSH(self):    self.objsftp.close()    self.obj.close()  # 获取文件大小  def GetFileSize(self,file_path):    ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")    return ref.replace("/n","")  # 判断文件是否存在  def IsFile(self,file_path):    return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

通过Eval函数解析执行: 自定义语法规则与函数,通过Eval函数实现解析执行. 没写完,仅供参考。

import json,os,sys,mathimport argparse,time,reimport paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())def BatchCMD(address,username,password,port,command):  try:    ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)    stdin , stdout , stderr = ssh.exec_command(command)    result = stdout.read()    if len(result) != 0:      result = str(result).replace("//n", "/n")      result = result.replace("b'", "").replace("'", "")      return result    else:      return None  except Exception:    return None# ------------------------------------------------------------------------# 内置解析方法def GetDisk(x):  return str(x)def GetCPULoad():  return str(10)# 句式解析器,解析句子并执行def judge(string):  # 如果匹配到IF则执行判断条件解析  if re.findall(r'IF{ (.*?) }', string, re.M) != []:    # 则继续提取出他的表达式    ptr = re.compile(r'IF[{] (.*?) [}]',re.S)    subject = re.findall(ptr,string)[0]    subject_list = subject.split(" ")    # 拼接语句并执行    sentence = eval(subject_list[0]) + subject_list[1] + subject_list[2]    # 组合后执行,返回结果    if eval(sentence):      return "IF"    else:      return False  # 如果匹配到put则执行上传解析  elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:    print("put")  return False# 获取特定目录下所有的剧本def GetAllRule():  rootdir = os.getcwd() + "//rule//"  all_files = [f for f in os.listdir(rootdir)]  print("-" * 90)  print("{0:15} /t {1:10} /t {2:10} /t {3:5} /t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))  print("-" * 90)  for switch in all_files:    # 首先判断文件结尾是否为Json    if( switch.endswith(".json") == True):      all_switch_dir = rootdir + switch      try:        # 判断文件内部是否符合JSON规范        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:          # 判断是否存在指定字段来识别规范          load = json.loads(read_file.read())          if load.get("framework") != None and load.get("task_sequence") != None:            run_addr_count = len(load.get("address_list"))            command_count = len(load.get("task_sequence"))            print("{0:15} /t {1:10} {2:10} /t/t {3:5} /t/t {4:5}".               format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))      except ValueError:        pass# 指定一个剧本并运行def RunPlayBook(rule_name):  rootdir = os.getcwd() + "//rule//"  all_files = [f for f in os.listdir(rootdir)]  for switch in all_files:    if( switch.endswith(".json") == True):      all_switch_dir = rootdir + switch      # 寻找到需要加载的剧本地址      if( switch == rule_name):        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:          data = json.loads(read_file.read())          address_list = data.get("address_list")          # 循环每个主机任务          for each in address_list:            # 得到剧本内容            task_sequence = data.get("task_sequence")            default_port = data.get("default_port")            print("-" * 90)            print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))            print("-" * 90)            for task in task_sequence:              flag = judge(task[0])              if flag == "IF":                ref = BatchCMD(each[0],each[1],each[2],default_port,task[1])                print(ref)              elif flag == False:                ref = BatchCMD(each[0],each[1],each[2],default_port,task[0])                print(ref)if __name__ == "__main__":  RunPlayBook("get_log.json")

定义剧本规范如下。

{ "framework": "Centos", "version": "7.0", "address_list": [  ["192.168.191.3","root","1233"] ], "default_port": "22", "task_sequence": [  ["ifconfig"],  ["IF{ GetLastCmdFlag() == True }","uname"] ]}

词法分析: 词法分析解析剧本内容。

# 获取特定目录下所有的剧本def GetAllRule():  rootdir = os.getcwd() + "//rule//"  all_files = [f for f in os.listdir(rootdir)]  print("-" * 90)  print("{0:15} /t {1:10} /t {2:10} /t {3:5} /t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))  print("-" * 90)  for switch in all_files:    # 首先判断文件结尾是否为Json    if( switch.endswith(".json") == True):      all_switch_dir = rootdir + switch      try:        # 判断文件内部是否符合JSON规范        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:          # 判断是否存在指定字段来识别规范          load = json.loads(read_file.read())          if load.get("framework") != None and load.get("task_sequence") != None:            run_addr_count = len(load.get("address_list"))            command_count = len(load.get("task_sequence"))            print("{0:15} /t {1:10} {2:10} /t/t {3:5} /t/t {4:5}".               format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))      except ValueError:        pass# 句式解析器,解析句子并执行def judge(string):  # 如果匹配到IF则执行判断条件解析  if re.findall(r'IF{ (.*?) }', string, re.M) != []:    # 则继续提取出他的表达式    ptr = re.compile(r'IF[{] (.*?) [}]',re.S)    subject = re.findall(ptr,string)[0]    subject_list = subject.split(" ")    # 公开接口,执行命令    ssh = MySSH("192.168.191.3","root","1233","22")    # 组合命令并执行    sentence = str(eval(subject_list[0]) + subject_list[1] + subject_list[2])    if eval(sentence):      return "IF",ssh    else:      return False  # 如果匹配到put则执行上传解析  elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:    print("put")  return False# 指定一个剧本并运行def RunPlayBook(rule_name):  rootdir = os.getcwd() + "//rule//"  all_files = [f for f in os.listdir(rootdir)]  for switch in all_files:    if( switch.endswith(".json") == True):      all_switch_dir = rootdir + switch      # 寻找到需要加载的剧本地址      if( switch == rule_name):        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:          data = json.loads(read_file.read())          address_list = data.get("address_list")          # 循环每个主机任务          for each in address_list:            # 得到剧本内容            task_sequence = data.get("task_sequence")            default_port = data.get("default_port")            print("-" * 90)            print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))            print("-" * 90)            for task in task_sequence:              flag,obj = judge(task[0])              if flag == "IF":                ret = obj.BatchCMD(task[1])                print(ret)if __name__ == '__main__':  ret = judge("IF{ ssh.GetFileSize('/etc/passwd') >= 4 }")  print(ret)

MySSH类最终封装: 通过面向对象对其进行封装,实现了查询CPU,负载,内存利用率,磁盘容量,等通用数据的获取。

import paramiko, math,jsonclass MySSH:  def __init__(self, address, username, password, default_port):    self.address = address    self.default_port = default_port    self.username = username    self.password = password  # 初始化,远程模块  def Init(self):    try:      self.ssh_obj = paramiko.SSHClient()      self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())      self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,                 allow_agent=False, look_for_keys=False)      self.sftp_obj = self.ssh_obj.open_sftp()    except Exception:      return False  # 执行非交互命令  def BatchCMD(self, command):    try:      stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)      result = stdout.read()      if len(result) != 0:        result = str(result).replace("//n", "/n")        result = result.replace("b'", "").replace("'", "")        return result      else:        return None    except Exception:      return None  # 将远程文件下载到本地  def GetRemoteFile(self, remote_path, local_path):    try:      self.sftp_obj.get(remote_path, local_path)      return True    except Exception:      return False  # 将本地文件上传到远程  def PutLocalFile(self, localpath, remotepath):    try:      self.sftp_obj.put(localpath, remotepath)      return True    except Exception:      return False  # 关闭接口  def CloseSSH(self):    try:      self.sftp_obj.close()      self.ssh_obj.close()    except Exception:      pass  # 获取文件大小  def GetFileSize(self, file_path):    ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")    return ref.replace("/n", "")  # 判断文件是否存在  def IsFile(self, file_path):    return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))  # 获取系统型号  def GetSystemVersion(self):    return self.BatchCMD("uname")  # 检测目标主机存活状态  def GetPing(self):    try:      if self.GetSystemVersion() != None:        return True      else:        return False    except Exception:      return False  # 获取文件列表,并得到大小  def GetFileList(self, path):    try:      ref_list = []      self.sftp_obj.chdir(path)      file_list = self.sftp_obj.listdir("./")      for sub_path in file_list:        dict = {}        file_size = self.GetFileSize(path + sub_path)        dict[path + sub_path] = file_size        ref_list.append(dict)      return ref_list    except Exception:      return False  # 将远程文件全部打包后拉取到本地  def GetTarPackageAll(self, path):    try:      file_list = self.sftp_obj.listdir(path)      self.sftp_obj.chdir(path)      for packageName in file_list:        self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))        self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))        self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))        return True    except Exception:      return True  # 获取磁盘空间并返回字典  def GetAllDiskSpace(self):    ref_dict = {}    cmd_dict = {"Linux/n": "df | grep -v 'Filesystem' | awk '{print $5 /":/" $6}'",          "AIX/n": "df | grep -v 'Filesystem' | awk '{print $4 /":/" $7}'"          }    try:      os_version = self.GetSystemVersion()      for version, run_cmd in cmd_dict.items():        if (version == os_version):          # 根据不同版本选择不同的命令          os_ref = self.BatchCMD(run_cmd)          ref_list = os_ref.split("/n")          # 循环将其转换为字典          for each in ref_list:            # 判断最后是否为空,过滤最后一项            if each != "":              ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])      return ref_dict    except Exception:      return False  # 获取系统内存利用率  def GetAllMemSpace(self):    cmd_dict = {"Linux/n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 /":/" $2}'",          "AIX/n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 /":/" $4}'"          }    try:      os_version = self.GetSystemVersion()      for version, run_cmd in cmd_dict.items():        if (version == os_version):          # 根据不同版本选择不同的命令          os_ref = self.BatchCMD(run_cmd)          # 首先现将KB转化为MB          mem_total = math.ceil(int(os_ref.split(":")[0].replace("/n", "")) / 1024)          mem_free = math.ceil(int(os_ref.split(":")[1].replace("/n", "")) / 1024)          # 计算占用空间百分比          percentage = 100 - int(mem_free / int(mem_total / 100))          # 拼接字典数据          return {"Total": str(mem_total), "Free": str(mem_free), "Percentage": str(percentage)}    except Exception:      return False  # 获取系统进程信息,并返回字典格式  def GetAllProcessSpace(self):    ref_dict = {}    cmd_dict = {"Linux/n": "ps aux | grep -v 'USER' | awk '{print $2 /":/" $11}' | uniq",          "AIX/n": "ps aux | grep -v 'USER' | awk '{print $2 /":/" $12}' | uniq"          }    try:      os_version = self.GetSystemVersion()      for version, run_cmd in cmd_dict.items():        if (version == os_version):          os_ref = self.BatchCMD(run_cmd)          ref_list = os_ref.split("/n")          for each in ref_list:            if each != "":              ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])      return ref_dict    except Exception:      return False  # 获取CPU利用率  def GetCPUPercentage(self):    ref_dict = {}    cmd_dict = {"Linux/n": "vmstat | tail -n 1 | awk '{print $13 /":/" $14 /":/" $15}'",          "AIX/n": "vmstat | tail -n 1 | awk '{print $14 /":/" $15 /":/" $16}'"          }    try:      os_version = self.GetSystemVersion()      for version, run_cmd in cmd_dict.items():        if (version == os_version):          os_ref = self.BatchCMD(run_cmd)          ref_list = os_ref.split("/n")          for each in ref_list:            if each != "":              each = each.split(":")              ref_dict = {"us": each[0],"sys":each[1],"idea":each[2]}      return ref_dict    except Exception:      return False  # 获取机器的负载情况  def GetLoadAVG(self):    ref_dict = {}    cmd_dict = {"Linux/n": "uptime | awk '{print $10 /":/" $11 /":/" $12}'",          "AIX/n": "uptime | awk '{print $10 /":/" $11 /":/" $12}'"          }    try:      os_version = self.GetSystemVersion()      for version, run_cmd in cmd_dict.items():        if (version == os_version):          os_ref = self.BatchCMD(run_cmd)          ref_list = os_ref.split("/n")          for each in ref_list:            if each != "":              each = each.replace(",","").split(":")              ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}              return ref_dict      return False    except Exception:      return False  # 修改当前用户密码  def SetPasswd(self,username,password):    try:      os_id = self.BatchCMD("id | awk {'print $1'}")      print(os_id)      if(os_id == "uid=0(root)/n"):        self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))        return True    except Exception:      return False# 定义超类,集成基类MySSHclass SuperSSH(MySSH):  def __init__(self,address, username, password, default_port):    super(SuperSSH, self).__init__(address, username, password, default_port)

我们继续为上面的代码加上命令行,让其可以直接使用,这里需要遵循一定的格式规范,我们使用JSON解析数据,JSON格式如下.

{ "aix": [  ["192.168.1.1","root","123123"],  ["192.168.1.1","root","2019"], ], "suse": [  ["192.168.1.1","root","123123"], ], "centos":  [  ["192.168.1.1","root","123123"],  ]}

接着是主程序代码,如下所示.

# -*- coding: utf-8 -*-from MySSH import MySSHimport json,os,sys,argparseclass InitJson():  def __init__(self,db):    self.db_name = db  def GetPlatform(self,plat):    with open(self.db_name, "r", encoding="utf-8") as Read_Pointer:      load_json = json.loads(Read_Pointer.read())      for k,v in load_json.items():        try:          if k == plat:            return v        except Exception:          return None    return Noneif __name__ == "__main__":  ptr = InitJson("database.json")  parser = argparse.ArgumentParser()  parser.add_argument("-G","--group",dest="group",help="指定主机组")  parser.add_argument("-C","--cmd",dest="cmd",help="指定CMD命令")  parser.add_argument("--get",dest="get",help="指定获取数据类型<ping>")  parser.add_argument("--dst", dest="dst_file",help="目标位置")  parser.add_argument("--src",dest="src_file",help="原文件路径")  args = parser.parse_args()  # 批量CMD --group=aix --cmd=ls  if args.group and args.cmd:    platform = ptr.GetPlatform(args.group)    success,error = [],[]    for each in platform:      ssh = MySSH(each[0], each[1], each[2], 22)      if ssh.Init() != False:        print("-" * 140)        print("主机: {0:15} /t 账号: {1:10} /t 密码: {2:10} /t 命令: {3:30}".           format(each[0], each[1], each[2], args.cmd))        print("-" * 140)        print(ssh.BatchCMD(args.cmd))        ssh.CloseSSH()        success.append(each[0])      else:        error.append(each[0])        ssh.CloseSSH()    print("/n/n","-" * 140,       "/n 执行报告 /n",       "-" * 140,       "/n失败主机: {}/n".format(error),       "-" * 140)  # 批量获取主机其他数据 --group=centos --get=ping  if args.group and args.get:      platform = ptr.GetPlatform(args.group)      success, error = [], []      for each in platform:        ssh = MySSH(each[0], each[1], each[2], 22)        # 判断是否为ping        if ssh.Init() != False:          if args.get == "ping":            ret = ssh.GetPing()            if ret == True:              print("[*] 主机: {} 存活中.".format(each[0]))          # 收集磁盘数据          elif args.get == "disk":            print("-" * 140)            print("主机: {0:15} /t 账号: {1:10} /t 密码: {2:10}".               format(each[0], each[1], each[2]))            print("-" * 140)            ret = ssh.GetAllDiskSpace()            for k, v in ret.items():              if (v.split("%")[0] != "-"):                space_ret = int(v.split("%")[0])                if space_ret >= 70:                  print("磁盘分区: {0:30} /t 磁盘占用: {1:5} -> [警告]".format(k, v))                  continue                if space_ret >= 50:                  print("磁盘分区: {0:30} /t 磁盘占用: {1:5} -> [警惕]".format(k, v))                  continue                else:                  print("磁盘分区: {0:30} /t 磁盘占用: {1:5}".format(k, v))                  continue            print()        else:          error.append(each[0])          ssh.CloseSSH()      print("/n/n", "-" * 140,         "/n 执行报告 /n",         "-" * 140,         "/n失败主机: {}/n".format(error),         "-" * 140)  # 实现文件上传过程 --group=centos --src=./a.txt --dst=/tmp/test.txt  if args.group and args.src_file and args.dst_file:    platform = ptr.GetPlatform(args.group)    success, error = [], []    for each in platform:      ssh = MySSH(each[0], each[1], each[2], 22)      if ssh.Init() != False:        ret = ssh.PutLocalFile(args.src_file,args.dst_file)        if ret == True:          print("主机: {} /t 本地文件: {} /t ---> 传到: {}".format(each[0], args.src_file,args.dst_file))      else:        error.append(each[0])        ssh.CloseSSH()    print("/n/n", "-" * 140,       "/n 执行报告 /n",       "-" * 140,       "/n失败主机: {}/n".format(error),       "-" * 140)

简单的使用命令:

远程CMD: python main.py --group=centos --cmd="free -h | grep -v 'total'"

判断存活: python main.py --group=centos --get="ping"

拉取磁盘:python main.py --group=suse --get="disk"

批量上传文件: python main.py --group=suse --src="./aaa" --dst="/tmp/bbb.txt"

由于我的设备少,所以没开多线程,担心开多线程对目标造成过大压力,也没啥必要。

番外: 另外我研究了一个主机分组的小工具,加上命令执行代码量才800行,实现了一个分组数据库,在这里记下使用方法。

默认运行进入一个交互式shell环境。

Init = 初始化json文件,ShowHostList=显示所有主机,ShowGroup=显示所有组,ShowAllGroup=显示所有主机包括组。

添加修改与删除记录,命令如下。

添加删除主机组。

通过UUID向主机组中添加或删除主机记录。

测试主机组连通性。

以上就是Python如何实现Paramiko的二次封装的详细内容,更多关于Python实现Paramiko的二次封装的资料请关注51zixue.net其它相关文章!


python实现按日期归档文件
python多线程爬取西刺代理的示例代码
万事OK自学网:51自学网_软件自学网_CAD自学网自学excel、自学PS、自学CAD、自学C语言、自学css3实例,是一个通过网络自主学习工作技能的自学平台,网友喜欢的软件自学网站。