paramiko是一个python ssh库,提供python api去执行ssh操作,包括但不限于登录远程机器(ssh),执行远程命令,sftp上传下载文件。

# env: python3
import os
import tarfile
import subprocess
import paramiko
import paramiko.util
paramiko.util.log_to_file("paramiko.log")
 
 
def notify(*msg, **kwargs):
    print("--- ", *msg, **kwargs)
 
def tar_dir(dir_path, tar_path):
    """ Tar a directory `dir_path` to a tar file. """
    if tar_path.endswith(".gz") or tar_path.endswith(".tgz"):
        mode = "w:gz"
    elif tar_path.endswith(".bz2"):
        mode = "w:bz2"
    elif tar_path.endswith(".xz"):
        mode = "w:xz"
    else:
        mode = "w"
    with tarfile.open(tar_path, mode) as tar:
        tar.add(dir_path, arcname=os.path.basename(dir_path))
 
def untar_file(tar_path, target_dir):
    with tarfile.open(tar_path, "r") as tar:
        tar.extractall(target_dir)
 
def test_tar():
    tar_dir("./", "/home/bingbing.hu/my.tar.gz")
    untar_file("/home/bingbing.hu/my.tar.gz", "/home/bingbing.hu/mytar_gz_extracted")
 
class CommandRunException(Exception):
    pass
 
 
class NodeConf(object):
    HOST = ""
    PORT = 22
    USERNAME = "fuck"
    PASSWORD = "you"
 
class NewApiDevNode(NodeConf):
    HOST = "1.2.3.4"
 
class SSHConnection(object):
    def __init__(self, host="", password="", username="bingbing.hu", port=22) -> None:
        self.host = host
        self.password = password
        self.username = username
        self.port = port
        self._client = None      # type: paramiko.SSHClient | None
        self._sftp_client = None # type: paramiko.SFTPClient | None
 
    def close(self):
        if self._client is not None:
            self._client.close()
        if self._sftp_client is not None:
            self._sftp_client.close()
        self._client = None
        self._sftp_client = None
 
    def re_init(self, node_cls):
        """ Re-init the node with a NodeConf class. """
        self.host = node_cls.HOST
        self.password = node_cls.PASSWORD
        self.username = node_cls.USERNAME
        self.port = node_cls.PORT
        if self._client is not None:
            self._client.close()
            self._client = None
        self.get_client()
 
    def get_client(self):
        if self._client is None:
            self._client = ssh = paramiko.SSHClient()
            ssh.load_system_host_keys()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(self.host, self.port, self.username, self.password)
        return self._client
 
    def get_sftp_client(self):
        if self._sftp_client is None:
            self._sftp_client = self.get_client().open_sftp()
        return self._sftp_client
 
    def excute(self, cmd):
        stdin, stdout, stderr = self.get_client().exec_command(cmd)
        return self.__handle_cmd_output(stdout, stderr, cmd)
 
    def excute_with_sudo(self, cmd):
        stdin, stdout, stderr = self.get_client().exec_command("sudo -S %s" % cmd)
        stdin.write(self.password + "\n")
        stdin.flush()
        return self.__handle_cmd_output(stdout, stderr, cmd)
 
    @staticmethod
    def __handle_cmd_output(stdout: paramiko.ChannelFile, stderr: paramiko.ChannelFile, cmd: str) -> int:
        status = stdout.channel.recv_exit_status()
        if status != 0:
            raise CommandRunException(
                "Excute '%s' failed with status %d.\n\n%s"
                % (cmd, status, stderr.read().decode()))
        for line in stdout:
            print(line, end="")
        return status
 
    def upload(self, local_path, remote_path):
        if not os.path.exists(local_path):
            raise FileNotFoundError("Local file %s not found." % local_path)
        if remote_path[-1] == '/': # treat remote_path as a directory
            self.__ensure_remote_path(remote_path)
            filename = os.path.basename(local_path)
            remote_path += filename
        notify("Uploading %s to %s" % (local_path, remote_path))
 
        # \r is used to move cursor to the beginning of the line.
        cb = lambda x, y: notify(f"Uploaded {x}/{y}\r", end="", flush=True)
        self.get_sftp_client().put(local_path, remote_path, callback=cb)
        print('') # used to remove the last \r
 
    def download(self, remote_path, local_path):
        if remote_path[-1] == '/':
            raise ValueError("Remote path should be a file path.")
        local_file = local_path
        if local_path[-1] == '/':
            local_dir = local_path[:-1]
            local_file = os.path.join(local_path, os.path.basename(remote_path))
        else:
            local_dir = os.path.split(local_path)[0]
        if not os.path.exists(local_dir):
            os.makedirs(local_dir)
 
        sftp = self.get_sftp_client()
        notify("Downloading %s to %s" % (remote_path, local_file))
 
        # \r is used to move cursor to the beginning of the line.
        cb = lambda x, y: print(f"\rDownloaded {x}/{y}", end="", flush=True)
        sftp.get(remote_path, local_file, callback=cb)
 
    def __ensure_remote_path(self, remote_path):
        self.excute(f"""
            if [ ! -d {remote_path} ]; then
                mkdir -p {remote_path}
            fi
            """)
 
def deploy_newapi_dev():
    conn = SSHConnection()
    conn.re_init(NewApiDevNode)
 
    # code_root = os.path.expanduser("~/code/newapi")
    # print(code_root)
 
    args = ["/datayes/api_dev/bin", "api_service.tar.gz"]
    # subprocess.check_call(["make", "-C", *args])
    local_file = '/'.join(args)
    conn.upload(local_file, './')
    conn.excute(f"""
    mkdir ttt
    tar -xavf {args[1]} -C ttt
""")
    conn.close()
 
def test_connection():
    conn = SSHConnection()
    conn.re_init(NewApiDevNode)
 
    a = conn.excute("pwd;df -h")
    conn.excute_with_sudo("apt list --upgradable | head -n5")
    conn.excute("""
cd /tmp
mkdir -p test
cd test
pwd
touch {a,b,c}
ls -l""")
 
    conn.upload("main.py", "./areyouok/")
    # conn.excute("./a.out")
    # node.get_sftp_client().put("main.py", "main.py")
    conn.download("./api_etc.tar", "./from_remote.tar")
    conn.close()
    print(NewApiDevNode.PASSWORD)
    print(NewApiDevNode.USERNAME)
 
 
 
if __name__ == "__main__":
    deploy_newapi_dev()
 

closed by https://github.com/guyueshui/dotfiles/tree/master/bin/paramiko


更新下wrapper

node.py

# encoding = utf-8
# This file stores the information of remote nodes.
 
class NodeConf(object):
    HOST = ""
    PORT = 22
    USERNAME = "bingbing.hu"
    PASSWORD = "datayes@123"
 
    @classmethod
    def get_para(cls):
        return (cls.HOST, cls.PASSWORD, cls.USERNAME, cls.PORT)
 
    @classmethod
    def get_para_dict(cls):
        return {"host": cls.HOST, "password": cls.PASSWORD,
                "username": cls.USERNAME, "port": cls.PORT }
 
class NewApiDevNode(NodeConf):
    HOST = "10.24.21.23"
 
class DbStoreNode(NodeConf):
    HOST = "10.24.21.105"
 
class MdlDevNode(NodeConf):
    HOST = "10.24.21.40"
 
class DbAlchemyNodeStg(NodeConf):
    HOST = "10.24.21.25"
    USERNAME = "xufei.li"
 
class BarDev(NodeConf):
    HOST = "10.24.21.181"
    USERNAME = "xufei.li"
 
    DEPLOY_DIR = "/home/shang/mdl_bar"
 

ssh_connection.py

# encoding: utf-8
 
from tool.utils import notify
import os
import time
import paramiko
import paramiko.util
paramiko.util.log_to_file("paramiko.log")
 
 
class CommandRunException(Exception):
    pass
 
 
class SSHConnection(object):
    def __init__(self, host="", password="", username="bingbing.hu", port=22) -> None:
        self.host = host
        self.password = password
        self.username = username
        self.port = port
        self._client = None      # type: paramiko.SSHClient | None
        self._sftp_client = None # type: paramiko.SFTPClient | None
 
    def __enter__(self):
        return self
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("type: %s" % exc_type)
        print("val: %s" % exc_val)
        print("tb: %s" % exc_tb)
        self.close()
 
    def close(self):
        if self._client is not None:
            self._client.close()
        if self._sftp_client is not None:
            self._sftp_client.close()
        self._client = None
        self._sftp_client = None
 
    def re_init(self, node_cls):
        """ Re-init the node with a NodeConf class. """
        self.host = node_cls.HOST
        self.password = node_cls.PASSWORD
        self.username = node_cls.USERNAME
        self.port = node_cls.PORT
        if self._client is not None:
            self._client.close()
            self._client = None
        self.get_client()
 
    def get_client(self):
        if self._client is None:
            self._client = ssh = paramiko.SSHClient()
            ssh.load_system_host_keys()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(self.host, self.port, self.username, self.password)
        return self._client
 
    def get_sftp_client(self):
        if self._sftp_client is None:
            self._sftp_client = self.get_client().open_sftp()
        return self._sftp_client
 
    def excute(self, cmd):
        stdin, stdout, stderr = self.get_client().exec_command(cmd)
        return self.__handle_cmd_output(stdout, stderr, cmd)
 
    def excute_with_sudo(self, cmd):
        stdin, stdout, stderr = self.get_client().exec_command("sudo -S %s" % cmd)
        stdin.write(self.password + "\n")
        stdin.flush()
        return self.__handle_cmd_output(stdout, stderr, cmd)
 
    def __excute_successive_cmds(self, cmd):
        client = self.get_client()
        chan = client.invoke_shell()
        chan.set_combine_stderr(True)
        stdin = chan.makefile_stdin("wb", -1)
        stdout = chan.makefile("r", -1)
        # chan.send("sudo -i\n")
        # chan.sendall(self.password + "\n")
        # time.sleep(.5)
        # stdin.flush()
        stdin.write("ls -al\n")
        stdin.write(f"""
sudo -i
{self.password}
""")
        time.sleep(0.5)
#         chan.send("""
# pwd
# echo $HOME
# echo $USER
# """.encode())
        # chan.sendall("pwdx 15391\n".encode())
        chan.send("exit\n".encode())
        chan.send("./a.out\n".encode())
        chan.send("exit\n".encode())
        while True:
            if chan.recv_ready():
                print(chan.recv(1024).decode())
                continue
            if chan.exit_status_ready():
                exit_status = chan.recv_exit_status()
                break
            if chan.closed or chan.eof_received or not chan.active:
                break
            time.sleep(0.5)
        print("exit_status:", exit_status)
 
    @staticmethod
    def __handle_cmd_output(stdout: paramiko.ChannelFile, stderr: paramiko.ChannelFile, cmd: str) -> int:
        for line in stdout:
            print(line, end="")
        status = stdout.channel.recv_exit_status()
        if status != 0:
            raise CommandRunException(
                "Excute '%s' failed with status %d.\n\n%s"
                % (cmd, status, stderr.read().decode()))
        return status
 
    def get_cmd_chain(self, timeout=0):
        return CommandChain(self.get_client(), timeout)
 
    def upload(self, local_path, remote_path):
        if not os.path.exists(local_path):
            raise FileNotFoundError("Local file %s not found." % local_path)
        if remote_path[-1] == '/': # treat remote_path as a directory
            self.__ensure_remote_path(remote_path)
            filename = os.path.basename(local_path)
            remote_path += filename
        notify("Uploading %s to %s" % (local_path, remote_path))
 
        # \r is used to move cursor to the beginning of the line.
        cb = lambda x, y: notify(f"Uploaded {x}/{y}\r", end="", flush=True)
        self.get_sftp_client().put(local_path, remote_path, callback=cb)
        print('') # used to remove the last \r
 
    def download(self, remote_path, local_path):
        if remote_path[-1] == '/':
            raise ValueError("Remote path should be a file path.")
        local_file = local_path
        if local_path[-1] == '/':
            local_dir = local_path[:-1]
            local_file = os.path.join(local_path, os.path.basename(remote_path))
        else:
            local_dir = os.path.split(local_path)[0]
        if not os.path.exists(local_dir):
            os.makedirs(local_dir)
 
        sftp = self.get_sftp_client()
        notify("Downloading %s to %s" % (remote_path, local_file))
 
        # \r is used to move cursor to the beginning of the line.
        cb = lambda x, y: print(f"\rDownloaded {x}/{y}", end="", flush=True)
        sftp.get(remote_path, local_file, callback=cb)
 
    def __ensure_remote_path(self, remote_path):
        self.excute(f"""
            if [ ! -d {remote_path} ]; then
                mkdir -p {remote_path}
            fi
            """)
 
 
class CommandChain(object):
    def __init__(self, ssh_client: paramiko.SSHClient, timeout=0):
        assert ssh_client is not None
        self._client = ssh_client
        self._chan = ssh_client.invoke_shell(self.__class__.__name__) # type: paramiko.Channel
        time.sleep(1) # wait the channel to be ready
        # self._chan.setblocking(0)
        # self._chan.set_combine_stderr(True)
        self._stdin = self._chan.makefile_stdin("wb", -1)
        self._exit_status = 0
        self._timeout = timeout
 
    def over(self):
        time.sleep(0.5) # essential, otherwise the command will be blocked
        # while not self._chan.exit_status_ready():
        #     self.execute("exit")
        self.__handle_output()
        self._stdin.close()
        self._chan.close()
        self._client = None
 
    def execute(self, one_line_cmd: str):
        one_line_cmd = one_line_cmd.rstrip('\n') + '\n'
        self._chan.send(one_line_cmd.encode())
        return self
 
    def write_input(self, text: str):
        text = text.rstrip('\n') + '\n'
        self._stdin.write(text.encode())
        return self
 
    def __handle_output(self):
        chan = self._chan
        # If you don't set timeout, the stdout.read will block,
        # or you can send multiple "exit" to remote node,
        # this will make chan.exit_status_ready returns true.
        if self._timeout > 0:
            chan.settimeout(5)
        stdout = chan.makefile("r", -1)
        try:
            for line in stdout:
                print(line, end="")
        except Exception as e:
            notify("no data in %ss, channel will be closed!" % self._timeout)
 
 
def test_connection():
    from tool.node import NewApiDevNode
    conn = SSHConnection()
    conn.re_init(NewApiDevNode)
 
    conn.excute("pwd;df -h")
    conn.excute_with_sudo("apt list --upgradable | head -n5")
    conn.excute("""
cd /tmp
mkdir -p test
cd test
pwd
touch {a,b,c}
ls -l""")
 
    conn.upload("main.py", "./areyouok/")
    # conn.excute("./a.out")
    # node.get_sftp_client().put("main.py", "main.py")
    conn.download("./api_etc.tar", "./from_remote.tar")
    conn.close()
 
 
def test_cmd_chain():
    from tool.node import NewApiDevNode
    conn = SSHConnection()
    conn.re_init(NewApiDevNode)
 
    conn.get_cmd_chain().execute("echo $USER")\
        .execute("sudo -i").write_input(conn.password)\
        .execute("echo $USER").execute("apt list --upgradable|head -5")\
        .over()
 
 
if __name__ == "__main__":
    # test_connection()
    test_cmd_chain()

utils.py

# encoding: utf-8
 
import os
import tarfile
 
def notify(*msg, **kwargs):
    print("---", *msg, **kwargs)
 
def __tar_mode_helper(tar_path: str):
    if tar_path.endswith(".gz") or tar_path.endswith(".tgz"):
        mode = "w:gz"
    elif tar_path.endswith(".bz2"):
        mode = "w:bz2"
    elif tar_path.endswith(".xz"):
        mode = "w:xz"
    else:
        mode = "w"
    return mode
 
def tar_dir(dir_path: str, tar_path: str, **kwargs):
    """ Tar a directory `dir_path` to a tar file. """
    mode = __tar_mode_helper(tar_path)
    with tarfile.open(tar_path, mode) as tar:
        tar.add(dir_path, arcname=os.path.basename(dir_path), **kwargs)
 
def tar_files(tar_path: str, *files):
    mode = __tar_mode_helper(tar_path)
    with tarfile.open(tar_path, mode) as tar:
        for fn in filter(lambda x: os.path.exists(x), files):
            tar.add(fn, arcname=os.path.basename(fn))
 
def untar_file(tar_path, target_dir):
    with tarfile.open(tar_path, "r") as tar:
        tar.extractall(target_dir)
 
def test_tar():
    tar_dir("./", "/home/bingbing.hu/my.tar.gz")
    untar_file("/home/bingbing.hu/my.tar.gz", "/home/bingbing.hu/mytar_gz_extracted")
 
 
if __name__ == "__main__":
    test_tar()

示例

from common import *
import os
import tarfile
 
 
class CompileTask1604(object):
    """ Compile on ubuntu 16.04. """
    def __init__(self) -> None:
        self.conn = SSHConnection()
        self.conn.re_init(N.NewApiDevNode)
 
    def destroy(self):
        if self.conn is not None:
            self.conn.close()
        self.conn = None
 
    @staticmethod
    def filter_src_files(tarinfo: tarfile.TarInfo):
        name = tarinfo.name
        if '.git' in name or ".vscode" in name or ".cache" in name:
            return None
        if name.startswith('.'):
            return None
        if tarinfo.isdir():
            if "3rd" in name or "build" in name or "doc" in name or "obj" in name or "bin" in name:
                return None
            return tarinfo
        if tarinfo.isfile() and (
            name.endswith(".h") or name.endswith(".cpp")
        ):
            return tarinfo
        return None
 
    def compile_mdl_src(self):
        conn = self.conn
 
        tar_name = "mdl_src.tar.bz2"
        utils.tar_dir(MDL_SRC, tar_name, filter=self.__class__.filter_src_files)
        conn.upload(tar_name, "./code/")
        conn.excute(f"""
cd code
tar -xavf {tar_name}
rm -f {tar_name}
echo; echo
cd mdl_src/build/make
make -j4
""")
        os.remove(tar_name)
 
    def compile_dbalchemy(self):
        conn = self.conn
 
        def _filter(tarinfo: tarfile.TarInfo):
            name = tarinfo.name
            if 'build.sh' in name or "makefile" in name:
                return tarinfo
            return self.__class__.filter_src_files(tarinfo)
 
        tar_name = "dbalchemy.tar.bz2"
        utils.tar_dir(DBALCHEMY, tar_name, filter=_filter)
        conn.upload(tar_name, "./code/")
        conn.excute(f"""
set -e
cd code
tar -xavf {tar_name}
rm -f {tar_name}
echo; echo
cd dbalchemy/
bash -e build.sh
""")
        os.remove(tar_name)
 
 
if __name__ == '__main__':
    task = CompileTask1604()
    task.compile_mdl_src()
    task.destroy()