# tbot, Embedded Automation Tool
# Copyright (C) 2018 Harald Seiler
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import enum
import io
import itertools
import json
import os
import sys
import time
import typing
import termcolor2
IS_UNICODE = sys.stdout.encoding == "UTF-8"
IS_COLOR = False
# Taken from https://bixense.com/clicolors/
if (os.getenv("CLICOLOR", "1") != "0" and sys.stdout.isatty()) or os.getenv(
"CLICOLOR_FORCE", "0"
) != "0":
IS_COLOR = True
[docs]def u(with_unicode: str, without_unicode: str) -> str:
"""
Select a string depending on whether the terminal supports unicode.
:param str with_unicode: The string to be used if unicode is available
:param str without_unicode: The string to be used if unicode is **not** available
:rtype: str
:returns: The selected string
"""
if IS_UNICODE:
return with_unicode
return without_unicode
class _C(termcolor2._C):
def __str__(self) -> str:
if IS_COLOR:
return super().__str__()
else:
return self.string
c = _C
_TC = typing.Union[_C, termcolor2._C]
@enum.unique
class Verbosity(enum.IntEnum):
"""Verbosity levels."""
QUIET = 0
INFO = enum.auto()
COMMAND = enum.auto()
STDOUT = enum.auto()
CHANNEL = enum.auto()
def __str__(self) -> str:
return super(Verbosity, self).__str__().split(".")[-1]
NESTING = 0
INTERACTIVE = False
VERBOSITY = Verbosity.COMMAND
LOGFILE: typing.Optional[typing.TextIO] = None
START_TIME = time.monotonic()
[docs]class EventIO(io.StringIO):
"""Stream for a log event."""
def __init__(
self,
ty: typing.List[str],
initial: typing.Union[str, _TC, None] = None,
*,
verbosity: Verbosity = Verbosity.INFO,
nest_first: typing.Optional[str] = None,
**kwargs: typing.Any,
) -> None:
"""
Create a log event.
A log event is a :class:`io.StringIO` and everything written to
the stram will be added to the log event.
:param str initial: Optional first line of the log event
"""
super().__init__("")
self.cursor = 0
self.first = True
self.prefix: typing.Optional[str] = None
self.nest_first = nest_first or u("├─", "+-")
self.verbosity = verbosity
self.ty = ty
self.data = kwargs
if initial:
self.writeln(str(initial))
def _prefix(self) -> str:
after = self.nest_first if self.first else u("│ ", "| ")
self.first = False
prefix: str = self.prefix or ""
return (
str(c("".join(itertools.repeat(u("│ ", "| "), NESTING)) + after).dark)
+ prefix
)
def _print_lines(self, last: bool = False) -> None:
buf = self.getvalue()[self.cursor :]
if self.verbosity > VERBOSITY:
return
while "\n" in buf:
line = buf.split("\n", maxsplit=1)[0]
print(self._prefix() + c(line))
length = len(line) + 1
self.cursor += length
buf = buf[length:]
self.first = False
if last and buf != "":
print(self._prefix() + buf)
def writeln(self, s: typing.Union[str, _TC]) -> int:
"""Add a line to this log event."""
return self.write(s + "\n")
def write(self, s: str) -> int:
r"""
Add some text to this log event.
Printing to stdout will only occur once a newline ``"\n"`` is
written.
"""
s = (
s.replace("\x1B[H", "")
.replace("\x1B[2J", "")
.replace("\x1B[K", "")
.replace("\x1B[r", "")
.replace("\x1B[u", "")
.replace("\x08", "")
)
res = super().write(s)
self._print_lines()
return res
def __enter__(self) -> "EventIO":
return self
def close(self) -> None:
"""
Finalize this log event.
No more text can be added to this log event after
closing it.
"""
self._print_lines(last=True)
if LOGFILE is not None:
ev = {
"type": self.ty,
"time": time.monotonic() - START_TIME,
"data": self.data,
}
json.dump(ev, LOGFILE, indent=2)
LOGFILE.write("\n")
LOGFILE.flush()
super().close()
def __del__(self) -> None:
if not self.closed:
self.close()
[docs]def message(
msg: typing.Union[str, _TC], verbosity: Verbosity = Verbosity.INFO
) -> EventIO:
"""
Log a message.
:param str msg: The message
:param Verbosity verbosity: Message verbosity
"""
return EventIO(["msg", str(verbosity)], msg, verbosity=verbosity, text=str(msg))
def warning(msg: typing.Union[str, _TC]) -> EventIO:
"""
Emit a warning message.
:param str msg: The message
.. versionadded:: 0.6.3
"""
return message(c("Warning").yellow.bold + ": " + msg, Verbosity.QUIET)