Source code for tbot.machine.channel.channel

# tbot, Embedded Automation Tool
# Copyright (C) 2018  Harald Seiler
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import abc
import collections
import io
import itertools
import re
import select
import sys
import termios
import time
import tty
import typing

import tbot
from tbot.machine.linux import shell

TBOT_PROMPT = "TBOT-VEJPVC1QUk9NUFQK$ "


[docs]class ChannelClosedException(Exception): """Exception when attempting to interact with a closed channel.""" pass
[docs]class SkipStream(io.StringIO): """Stream wrapper that skips a few character at the start.""" def __init__(self, stream: typing.TextIO, n: int) -> None: """ Create a new SkipStream that skips ``n`` chars. :param io.TextIOBase stream: The underlying stream. The first ``n`` chars written to this SkipStream will not be written to ``stream``. :param int n: Number of characters to skip. """ self.stream = stream self.n = n
[docs] def write(self, s: str) -> int: """Write some string to this stream.""" if self.n > 0: if self.n > len(s): self.n -= len(s) return len(s) else: s = s[self.n :] n = self.n self.n = 0 return self.stream.write(s) + n else: return self.stream.write(s)
[docs]class Channel(abc.ABC): """Generic channel."""
[docs] @abc.abstractmethod def send(self, data: typing.Union[bytes, str]) -> None: """ Send some data to this channel. :param bytes, str data: Data to be sent. It data is a :class:`str` it will be encoded using ``utf-8``. :raises ChannelClosedException: If the cannel is no longer open. """ pass
[docs] @abc.abstractmethod def recv( self, timeout: typing.Optional[float] = None, max: typing.Optional[int] = None ) -> bytes: """ Receive some data from this channel. ``recv`` will block until at least one byte is available or the timeout is reached. .. warning:: ``recv`` does in no way attempt to ensure that bytes can be decoded as unicode. It is very well possible that recv returns after half a unicode sequence. Code using ``recv`` needs to be robust in regards to this. Because of this, ``recv`` should only be used if no other method of :class:`~tbot.machine.channel.Channel` suits your needs. :param float timeout: Optional timeout after which ``recv`` should return if no data is avalable. :param int max: Optional maximum number of bytes to read. :raises ChannelClosedException: If the channel is no longer open. :raises TimeoutError: If the timeout was reached before data got available. """ pass
[docs] @abc.abstractmethod def close(self) -> None: """ Close this channel. An Implementation of ``close`` must call ``Channel.cleanup`` before closing the channel if it is still open. Calls to ``send``/``recv`` must fail after calling ``close``. """ pass
[docs] @abc.abstractmethod def fileno(self) -> int: """ Return the filedescriptor of this channel. :rtype: int """ pass
[docs] @abc.abstractmethod def isopen(self) -> bool: """ Return whether this channel is still open. .. warning:: The result of this call should only be taken as a hint, as the remote end might unexpectedly close the channel shortly after the call. :rtype: bool """ pass
def _interactive_setup(self) -> None: """Do some setup before making a channel interactive.""" pass def _interactive_teardown(self) -> None: """Teardown after returning from an interactive session.""" pass
[docs] def register_cleanup(self, clean: "typing.Callable[[Channel], None]") -> None: """Register a cleanup function for this channel.""" def cleanup() -> None: clean(self) self.cleanup = cleanup
[docs] def attach_interactive( self, end_magic: typing.Union[str, bytes, None] = None ) -> None: """ Connect tbot's terminal to this channel. Allows the user to interact directly with whatever this channel is connected to. :param str, bytes end_magic: String that when read should end the interactive session. """ end_magic_bytes = ( end_magic.encode("utf-8") if isinstance(end_magic, str) else end_magic ) end_ring_buffer: typing.Deque[int] = collections.deque( maxlen=len(end_magic_bytes) if end_magic_bytes is not None else 1 ) previous: typing.Deque[int] = collections.deque(maxlen=3) oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) mode = termios.tcgetattr(sys.stdin) special_chars = mode[6] assert isinstance(special_chars, list) special_chars[termios.VMIN] = b"\0" special_chars[termios.VTIME] = b"\0" termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, mode) self._interactive_setup() while True: r, _, _ = select.select([self, sys.stdin], [], []) if self in r: data = self.recv() if isinstance(end_magic_bytes, bytes): end_ring_buffer.extend(data) for a, b in itertools.zip_longest( end_ring_buffer, end_magic_bytes ): if a != b: break else: break sys.stdout.buffer.write(data) sys.stdout.buffer.flush() if sys.stdin in r: data = sys.stdin.buffer.read(4096) previous.extend(data) if end_magic is None and data == b"\x04": break for a, b in itertools.zip_longest(previous, b"\r~."): if a != b: break else: break self.send(data) sys.stdout.write("\r\n") finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) self._interactive_teardown()
[docs] def initialize(self, *, sh: typing.Type[shell.Shell] = shell.Bash) -> None: """ Initialize this channel so it is ready to receive commands. Internally runs commands to set the prompt to a known value and disable history + line-editing. :param tbot.machine.linux.shell.Shell sh: Type of the Shell this channel is connected to. """ # Set proper prompt self.raw_command(sh.set_prompt(TBOT_PROMPT)) # Ensure we don't make history cmd = sh.disable_history() if cmd is not None: self.raw_command(cmd) # Disable line editing cmd = sh.disable_editing() if cmd is not None: self.raw_command(cmd) # Ensure multiline commands work cmd = sh.set_prompt2("") if cmd is not None: self.raw_command(cmd)
def __init__(self) -> None: """Create a new channel.""" self.cleanup: typing.Callable[[], None] = lambda: None self.initialize()
[docs] def recv_n(self, n: int, timeout: typing.Optional[float] = None) -> bytes: """ Receive exactly N bytes. Return exactly N bytes or raise an exception if the channel closed/the timeout was reached. :param int n: Number of bytes :param float timeout: Optional timeout """ start_time = time.monotonic() buf = b"" while len(buf) < n: buf += self.recv(timeout=timeout, max=(n - len(buf))) if timeout is not None: timeout = timeout - (time.monotonic() - start_time) return buf
def _debug_log(self, data: bytes, out: bool = False) -> None: if tbot.log.VERBOSITY >= tbot.log.Verbosity.CHANNEL: json_data: str try: json_data = data.decode("utf-8") except UnicodeDecodeError: json_data = data.decode("latin1") msg = tbot.log.c(repr(data)[1:]) tbot.log.EventIO( ["__debug__"], ( tbot.log.c("> ").blue.bold + msg.blue if out else tbot.log.c("< ").yellow.bold + msg.yellow ), verbosity=tbot.log.Verbosity.CHANNEL, direction="send" if out else "recv", data=json_data, )
[docs] def read_until_prompt( self, prompt: str, *, regex: bool = False, stream: typing.Optional[typing.TextIO] = None, timeout: typing.Optional[float] = None, must_end: bool = True, ) -> str: """ Read until receiving ``prompt``. :param str prompt: The prompt to wait for. If ``regex`` is ``True``, this will be interpreted as a regular expression. :param bool regex: Whether the prompt should be interpreted as is or as a regular expression. In the latter case, a ``'$'`` will be added to the end of the expression. :param io.TextIOBase stream: Optional stream where ``read_until_prompt`` should write everything received up until the prompt is detected. :param float timeout: Optional timeout. :param bool must_end: Whether the prompt has to appear at the end of the stream. :raises TimeoutError: If a timeout is set and this timeout is reached before the prompt is detected. :rtype: str :returns: Everything read up until the prompt. """ start_time = time.monotonic() expr = None if regex: expr = f"{prompt}$" if must_end else prompt elif not must_end: expr = re.escape(prompt) buf = "" timeout_remaining = timeout while True: new = self.recv(timeout=timeout_remaining) decoded = "" for _ in range(10): try: decoded += new.decode("utf-8") break except UnicodeDecodeError: try: new += self.recv(timeout=0.1) except TimeoutError: pass else: decoded += new.decode("latin_1") decoded = ( decoded.replace("\r\n", "\n").replace("\r\n", "\n").replace("\r", "\n") ) buf += decoded if (expr is None and buf[-len(prompt) :] == prompt) or ( expr is not None and re.search(expr, buf) is not None ): if stream: # Don't clip prompt if it doesn't need to be at the end if must_end: stream.write(decoded[: -len(prompt)]) else: stream.write(decoded) break elif stream is not None: stream.write(decoded) if timeout is not None: current_time = time.monotonic() timeout_remaining = max(timeout - (current_time - start_time), 0) return buf
[docs] def raw_command( self, command: str, *, prompt: str = TBOT_PROMPT, stream: typing.Optional[typing.TextIO] = None, timeout: typing.Optional[float] = None, ) -> str: """ Send a command to this channel and wait until it finishes. :param str command: The command without a trailing newline :param str prompt: The prompt of the shell on this channel. This is needed to detect when the command is done. :param io.TextIOBase stream: Optional stream where the commands output should be written. :param float timeout: Optional timeout. :raises TimeoutError: If the timeout was reached before the command finished. :rtype: str :returns: The ouput of the command. Will contain a trailing newline unless the command did not send one (eg. ``printf``) """ self.send(f"{command}\n".encode("utf-8")) if stream: stream = SkipStream(stream, len(command) + 1) out = self.read_until_prompt(prompt, stream=stream, timeout=timeout)[ len(command) + 1 : -len(prompt) ] return out
[docs] def raw_command_with_retval( self, command: str, *, prompt: str = TBOT_PROMPT, retval_check_cmd: str = "echo $?", stream: typing.Optional[typing.TextIO] = None, timeout: typing.Optional[float] = None, ) -> typing.Tuple[int, str]: """ Send a command to this channel, wait until it finishes, and check its retcode. :param str command: The command without a trailing newline :param str prompt: The prompt of the shell on this channel. This is needed to detect when the command is done. :param str retval_check_cmd: Command used to check the exit code. :param io.TextIOBase stream: Optional stream where the commands output should be written. :param float timeout: Optional timeout. :raises TimeoutError: If the timeout was reached before the command finished. :rtype: tuple[int, str] :returns: The retcode and ouput of the command. Will contain a trailing newline unless the command did not send one (eg. ``printf``) """ out = self.raw_command(command, prompt=prompt, stream=stream, timeout=timeout) retval = int( self.raw_command(retval_check_cmd, prompt=prompt, timeout=timeout).strip() ) return retval, out