#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import datetime
import io
import logging
import math
import multiprocessing as mp
from multiprocessing.sharedctypes import Synchronized
import os
import sys
import signal
import subprocess as sp
import threading
import time
import traceback
import warnings
_IPYTHON = True
try:
import ipywidgets
except:
_IPYTHON = False
warnings.warn("could not load ipywidgets (IPython HTML output will not work)", category=ImportWarning)
try:
from IPython.display import display
except:
_IPYTHON = False
warnings.warn("could not load IPython (IPython HTML output will not work)", category=ImportWarning)
def_handl = logging.StreamHandler(stream = sys.stderr) # the default handler simply uses stderr
def_handl.setLevel(logging.DEBUG) # ... listens to all messaged
fmt = MultiLineFormatter('%(asctime)s %(name)s %(levelname)s : %(message)s')
def_handl.setFormatter(fmt) # ... and pads multiline messaged
log = logging.getLogger(__name__) # creates the default log for this module
log.addHandler(def_handl)
if sys.version_info[0] == 2: # minor hacks to be python 2 and 3 compatible
ProcessLookupError = OSError
inMemoryBuffer = io.BytesIO
elif sys.version_info[0] == 3:
inMemoryBuffer = io.StringIO
[docs]class StdoutPipe(object):
"""replacement for stream objects such as stdout which
forwards all incoming data using the send method of a
connection
example usage:
>>> import sys
>>> from multiprocessing import Pipe
>>> from progression import StdoutPipe
>>> conn_recv, conn_send = Pipe(False)
>>> sys.stdout = StdoutPipe(conn_send)
>>> print("hallo welt", end='') # this is no going through the pipe
>>> msg = conn_recv.recv()
>>> sys.stdout = sys.__stdout__
>>> print(msg)
hallo welt
>>> assert msg == "hallo welt"
"""
def __init__(self, conn):
self.conn = conn
[docs] def write(self, b):
self.conn.send(b)
[docs]class PipeToPrint(object):
def __call__(self, b):
print(b, end='')
PipeHandler = PipeToPrint
[docs]def choose_pipe_handler(kind = 'print', color_theme = None):
global PipeHandler
if kind == 'print':
PipeHandler = PipeToPrint
if color_theme is None:
choose_color_theme('term_default')
else:
choose_color_theme(color_theme)
elif kind == 'ipythonhtml':
if _IPYTHON:
PipeHandler = PipeFromProgressToIPythonHTMLWidget
if color_theme is None:
choose_color_theme('ipyt_default')
else:
choose_color_theme(color_theme)
else:
warnings.warn("can not choose ipythonHTML (IPython and/or ipywidgets were not loaded)")
try:
from shutil import get_terminal_size as shutil_get_terminal_size
except ImportError:
shutil_get_terminal_size = None
if sys.version_info[0] == 2:
old_math_ceil = math.ceil
def my_int_ceil(f):
return int(old_math_ceil(f))
math.ceil = my_int_ceil
# Magic conversion from 3 to 2
if sys.version_info[0] == 2:
_jm_compatible_bytearray = lambda x: x
else:
_jm_compatible_bytearray = bytearray
[docs]class LoopExceptionError(RuntimeError):
pass
[docs]class LoopInterruptError(Exception):
pass
[docs]def get_identifier(name=None, pid=None, bold=True):
if pid is None:
pid = os.getpid()
if bold:
esc_bold = ESC_BOLD
esc_no_char_attr = ESC_NO_CHAR_ATTR
else:
esc_bold = ""
esc_no_char_attr = ""
if name is None:
return "{}PID {}{}".format(esc_bold, pid, esc_no_char_attr)
else:
return "{}{} ({}){}".format(esc_bold, name, pid, esc_no_char_attr)
[docs]class Loop(object):
"""
class to run a function periodically an seperate process.
In case the called function returns True, the loop will stop.
Otherwise a time interval given by interval will be slept before
another execution is triggered.
The shared memory variable _run (accessible via the class property run)
also determines if the function if executed another time. If set to False
the execution stops.
For safe cleanup (and in order to catch any Errors)
it is advisable to instantiate this class
using 'with' statement as follows:
with Loop(**kwargs) as my_loop:
my_loop.start()
...
this will guarantee you that the spawned loop process is
down when exiting the 'with' scope.
The only circumstance where the process is still running is
when you set auto_kill_on_last_resort to False and answer the
question to send SIGKILL with no.
"""
def __init__(self,
func,
args = (),
interval = 1,
verbose = None,
sigint = 'stop',
sigterm = 'stop',
auto_kill_on_last_resort = False,
raise_error = True):
"""
func [callable] - function to be called periodically
args [tuple] - arguments passed to func when calling
intervall [pos number] - time to "sleep" between each call
verbose - DEPRECATED, only kept for compatibility, use global log.level to
specify verbosity
sigint [string] - signal handler string to set SIGINT behavior (see below)
sigterm [string] - signal handler string to set SIGTERM behavior (see below)
auto_kill_on_last_resort [bool] - If set False (default), ask user to send SIGKILL
to loop process in case normal stop and SIGTERM failed. If set True, send SIDKILL
without asking.
the signal handler string may be one of the following
ing: ignore the incoming signal
stop: raise InterruptedError which is caught silently.
"""
self._proc = None
if verbose is not None:
log.warning("verbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning)
self.func = func
self.args = args
self.interval = interval
assert self.interval >= 0
self._run = mp.Value('b', False)
self._pause = mp.Value('b', False)
self._sigint = sigint
self._sigterm = sigterm
self._auto_kill_on_last_resort = auto_kill_on_last_resort
log.debug("auto_kill_on_last_resort = %s", self._auto_kill_on_last_resort)
self._monitor_thread = None
self.pipe_handler = PipeHandler()
self.raise_error = raise_error
def __enter__(self):
return self
def __exit__(self, *exc_args):
if self.is_alive():
log.debug("loop is still running on context exit")
else:
log.debug("loop has stopped on context exit")
self.stop()
def __cleanup(self):
"""
Wait at most twice as long as the given repetition interval
for the _wrapper_function to terminate.
If after that time the _wrapper_function has not terminated,
send SIGTERM to and the process.
Wait at most five times as long as the given repetition interval
for the _wrapper_function to terminate.
If the process still running send SIGKILL automatically if
auto_kill_on_last_resort was set True or ask the
user to confirm sending SIGKILL
"""
# set run to False and wait some time -> see what happens
self.run = False
if check_process_termination(proc = self._proc,
timeout = 2*self.interval,
prefix = '',
auto_kill_on_last_resort = self._auto_kill_on_last_resort):
log.debug("cleanup successful")
else:
raise RuntimeError("cleanup FAILED!")
try:
self.conn_send.close()
except OSError:
pass
log.debug("wait for monitor thread to join")
self._monitor_thread.join()
@staticmethod
def _wrapper_func(func, args, shared_mem_run, shared_mem_pause, interval, log_queue, sigint, sigterm, name, logging_level, conn_send):
"""
to be executed as a separate process (that's why this functions is declared static)
"""
prefix = get_identifier(name)+' '
global log
log = logging.getLogger(__name__+'.'+"log_{}".format(get_identifier(name, bold=False)))
log.setLevel(logging_level)
# try:
# log.addHandler(QueueHandler(log_queue))
# except NameError:
# log.addHandler(def_handl)
sys.stdout = StdoutPipe(conn_send)
log.debug("enter wrapper_func")
SIG_handler_Loop(sigint, sigterm, log, prefix)
while shared_mem_run.value:
try:
# in pause mode, simply sleep
if shared_mem_pause.value:
quit_loop = False
else:
# if not pause mode -> call func and see what happens
try:
quit_loop = func(*args)
except LoopInterruptError:
raise
except Exception as e:
log.error("error %s occurred in loop alling 'func(*args)'", type(e))
log.info("show traceback.print_exc()\n%s", traceback.format_exc())
sys.exit(-1)
if quit_loop is True:
log.debug("loop stooped because func returned True")
break
time.sleep(interval)
except LoopInterruptError:
log.debug("quit wrapper_func due to InterruptedError")
break
log.debug("wrapper_func terminates gracefully")
def _monitor_stdout_pipe(self):
while True:
try:
b = self.conn_recv.recv()
self.pipe_handler(b)
except EOFError:
break
[docs] def start(self):
"""
uses multiprocess Process to call _wrapper_func in subprocess
"""
if self.is_alive():
log.warning("a process with pid %s is already running", self._proc.pid)
return
self.run = True
# try:
# log_queue = mp.Queue(-1)
# listener = QueueListener(log_queue, def_handl)
# listener.start()
# except NameError:
# log.error("QueueListener not available in this python version (need at least 3.2)\n"
# "this may resault in incoheerent logging")
# log_queue = None
log_queue = None
name = self.__class__.__name__
self.conn_recv, self.conn_send = mp.Pipe(False)
self._monitor_thread = threading.Thread(target = self._monitor_stdout_pipe)
self._monitor_thread.daemon=True
self._monitor_thread.start()
log.debug("started monitor thread")
self._proc = mp.Process(target = Loop._wrapper_func,
args = (self.func, self.args, self._run, self._pause, self.interval,
log_queue, self._sigint, self._sigterm, name, log.level, self.conn_send))
self._proc.start()
log.debug("started a new process with pid %s", self._proc.pid)
[docs] def stop(self):
"""
stops the process triggered by start
Setting the shared memory boolean run to false, which should prevent
the loop from repeating. Call __cleanup to make sure the process
stopped. After that we could trigger start() again.
"""
if self.is_alive():
self._proc.terminate()
if self._proc is not None:
self.__cleanup()
if self.raise_error:
if self._proc.exitcode == 255:
raise LoopExceptionError("the loop function return non zero exticode!\n"+
"see log (INFO level) for traceback information")
self._proc = None
[docs] def join(self, timeout):
"""
calls join for the spawned process with given timeout
"""
if self.is_alive():
self._proc.join(timeout)
[docs] def is_alive(self):
if self._proc is None:
return False
else:
return self._proc.is_alive()
[docs] def pause(self):
if self.run:
self._pause.value = True
log.debug("process with pid %s paused", self._proc.pid)
[docs] def resume(self):
if self.run:
self._pause.value = False
log.debug("process with pid %s resumed", self._proc.pid)
[docs] def getpid(self):
if self._proc is not None:
return self._proc.pid
else:
return None
@property
def run(self):
"""
makes the shared memory boolean accessible as class attribute
Set run False, the loop will stop repeating.
Calling start, will set run True and start the loop again as a new process.
"""
return self._run.value
@run.setter
def run(self, run):
self._run.value = run
[docs]class Progress(Loop):
"""
Abstract Progress Loop
Uses Loop class to implement a repeating function to display the progress
of multiples processes.
In the simplest case, given only a list of shared memory values 'count' (NOTE:
a single value will be automatically mapped to a one element list),
the total elapses time (TET) and a speed estimation are calculated for
each process and passed to the display function show_stat.
This functions needs to be implemented in a subclass. It is intended to show
the progress of ONE process in one line.
When max_count is given (in general as list of shared memory values, a single
shared memory value will be mapped to a one element list) the time to go TTG
will also be calculated and passed tow show_stat.
Information about the terminal width will be retrieved when setting width='auto'.
If supported by the terminal emulator the width in characters of the terminal
emulator will be stored in width and also passed to show_stat.
Otherwise, a default width of 80 characters will be chosen.
Also you can specify a fixed width by setting width to the desired number.
NOTE: in order to achieve multiline progress special escape sequences are used
which may not be supported by all terminal emulators.
example:
c1 = UnsignedIntValue(val=0)
c2 = UnsignedIntValue(val=0)
c = [c1, c2]
prepend = ['c1: ', 'c2: ']
with ProgressCounter(count=c, prepend=prepend) as sc:
sc.start()
while True:
i = np.random.randint(0,2)
with c[i].get_lock():
c[i].value += 1
if c[0].value == 1000:
break
time.sleep(0.01)
using start() within the 'with' statement ensures that the subprocess
which is started by start() in order to show the progress repeatedly
will be terminated on context exit. Otherwise one has to make sure
that at some point the stop() routine is called. When dealing with
signal handling and exception this might become tricky, so that the
use of the 'with' statement is highly encouraged.
"""
def __init__(self,
count,
max_count = None,
prepend = None,
width = 'auto',
speed_calc_cycles = 10,
interval = 1,
verbose = None,
sigint = 'stop',
sigterm = 'stop',
info_line = None):
"""
count [mp.Value] - shared memory to hold the current state, (list or single value)
max_count [mp.Value] - shared memory holding the final state, (None, list or single value),
may be changed by external process without having to explicitly tell this class.
If None, no TTG and relative progress can be calculated -> TTG = None
prepend [string] - string to put in front of the progress output, (None, single string
or of list of strings)
interval [int] - seconds to wait between progress print
speed_calc_cycles [int] - use the current (time, count) as
well as the (old_time, old_count) read by the show_stat function
speed_calc_cycles calls before to calculate the speed as follows:
s = count - old_count / (time - old_time)
verbose, sigint, sigterm -> see loop class
"""
if verbose is not None:
log.warning("verbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning)
try:
for c in count:
if not isinstance(c, Synchronized):
raise ValueError("Each element of 'count' must be if the type multiprocessing.sharedctypes.Synchronized")
self.is_multi = True
except TypeError:
if not isinstance(count, Synchronized):
raise ValueError("'count' must be if the type multiprocessing.sharedctypes.Synchronized")
self.is_multi = False
count = [count]
self.len = len(count)
if max_count is not None:
if self.is_multi:
try:
for i, m in enumerate(max_count):
if not isinstance(m, Synchronized):
max_count[i] = UnsignedIntValue(m)
except TypeError:
raise TypeError("'max_count' must be iterable")
else:
if not isinstance(max_count, Synchronized):
max_count = UnsignedIntValue(max_count)
max_count = [max_count]
else:
max_count = [None] * self.len
self.start_time = []
self.speed_calc_cycles = speed_calc_cycles
self.width = width
self.q = []
self.prepend = []
self.lock = []
self.last_count = []
self.last_speed = []
for i in range(self.len):
self.q.append(myQueue()) # queue to save the last speed_calc_cycles
# (time, count) information to calculate speed
self.last_count.append(UnsignedIntValue())
self.last_speed.append(FloatValue())
self.lock.append(mp.Lock())
self.start_time.append(FloatValue(val=time.time()))
if prepend is None:
# no prepend given
self.prepend.append('')
else:
if isinstance(prepend, str):
self.prepend.append(prepend)
else:
# assume list of prepend, (needs to be a sequence)
self.prepend.append(prepend[i])
self.max_count = max_count # list of multiprocessing value type
self.count = count # list of multiprocessing value type
self.interval = interval
self.verbose = verbose
self.show_on_exit = False
self.add_args = {}
self.info_line = info_line
# setup loop class with func
Loop.__init__(self,
func = Progress.show_stat_wrapper_multi,
args = (self.count,
self.last_count,
self.start_time,
self.max_count,
self.speed_calc_cycles,
self.width,
self.q,
self.last_speed,
self.prepend,
self.__class__.show_stat,
self.len,
self.add_args,
self.lock,
self.info_line),
interval = interval,
sigint = sigint,
sigterm = sigterm,
auto_kill_on_last_resort = True)
def __exit__(self, *exc_args):
self.stop()
@staticmethod
def _calc(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
q,
last_speed,
lock):
"""
do the pre calculations in order to get TET, speed, TTG
and call the actual display routine show_stat with these arguments
NOTE: show_stat is purely abstract and need to be reimplemented to
achieve a specific progress display.
"""
count_value = count.value
start_time_value = start_time.value
current_time = time.time()
if last_count.value != count_value:
# some progress happened
with lock:
# save current state (count, time) to queue
q.put((count_value, current_time))
# get older state from queue (or initial state)
# to to speed estimation
if q.qsize() > speed_calc_cycles:
old_count_value, old_time = q.get()
else:
old_count_value, old_time = 0, start_time_value
last_count.value = count_value
#last_old_count.value = old_count_value
#last_old_time.value = old_time
speed = (count_value - old_count_value) / (current_time - old_time)
last_speed.value = speed
else:
# progress has not changed since last call
# use also old (cached) data from the queue
#old_count_value, old_time = last_old_count.value, last_old_time.value
speed = last_speed.value
if (max_count is None):
max_count_value = None
else:
max_count_value = max_count.value
tet = (current_time - start_time_value)
if (speed == 0) or (max_count_value is None) or (max_count_value == 0):
ttg = None
else:
ttg = math.ceil((max_count_value - count_value) / speed)
return count_value, max_count_value, speed, tet, ttg
def _reset_all(self):
"""
reset all progress information
"""
for i in range(self.len):
self._reset_i(i)
def _reset_i(self, i):
"""
reset i-th progress information
"""
self.count[i].value=0
log.debug("reset counter %s", i)
self.lock[i].acquire()
for x in range(self.q[i].qsize()):
self.q[i].get()
self.lock[i].release()
self.start_time[i].value = time.time()
def _show_stat(self):
"""
convenient functions to call the static show_stat_wrapper_multi with
the given class members
"""
Progress.show_stat_wrapper_multi(self.count,
self.last_count,
self.start_time,
self.max_count,
self.speed_calc_cycles,
self.width,
self.q,
self.last_speed,
self.prepend,
self.__class__.show_stat,
self.len,
self.add_args,
self.lock,
self.info_line,
no_move_up=True)
[docs] def reset(self, i = None):
"""
convenient function to reset progress information
i [None, int] - None: reset all, int: reset process indexed by i
"""
# super(Progress, self).stop()
if i is None:
self._reset_all()
else:
self._reset_i(i)
# super(Progress, self).start()
@staticmethod
[docs] def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, **kwargs):
"""
re implement this function in a subclass
count_value - current value of progress
max_count_value - maximum value the progress can take
prepend - some extra string to be put for example in front of the
progress display
speed - estimated speed in counts per second (use for example humanize_speed
to get readable information in string format)
tet - total elapsed time in seconds (use for example humanize_time
to get readable information in string format)
ttg - time to go in seconds (use for example humanize_time
to get readable information in string format)
"""
raise NotImplementedError
@staticmethod
[docs] def show_stat_wrapper(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
width,
q,
last_speed,
prepend,
show_stat_function,
add_args,
i,
lock):
count_value, max_count_value, speed, tet, ttg, = Progress._calc(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
q,
last_speed,
lock)
return show_stat_function(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **add_args)
@staticmethod
[docs] def show_stat_wrapper_multi(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
width,
q,
last_speed,
prepend,
show_stat_function,
len_,
add_args,
lock,
info_line,
no_move_up=False):
"""
call the static method show_stat_wrapper for each process
"""
# print(ESC_BOLD, end='')
# sys.stdout.flush()
for i in range(len_):
Progress.show_stat_wrapper(count[i],
last_count[i],
start_time[i],
max_count[i],
speed_calc_cycles,
width,
q[i],
last_speed[i],
prepend[i],
show_stat_function,
add_args,
i,
lock[i])
n = len_
if info_line is not None:
s = info_line.value.decode('utf-8')
s = s.split('\n')
n += len(s)
for si in s:
if width == 'auto':
width = get_terminal_width()
if len(si) > width:
si = si[:width]
print("{0:<{1}}".format(si, width))
if no_move_up:
n = 0
# this is only a hack to find the end
# of the message in a stream
# so ESC_HIDDEN+ESC_NO_CHAR_ATTR is a magic ending
print(ESC_MOVE_LINE_UP(n) + ESC_MY_MAGIC_ENDING, end='')
sys.stdout.flush()
[docs] def start(self):
# before printing any output to stout, we can now check this
# variable to see if any other ProgressBar has reserved that
# terminal.
if (self.__class__.__name__ in TERMINAL_PRINT_LOOP_CLASSES):
if not terminal_reserve(progress_obj=self):
log.warning("tty already reserved, NOT starting the progress loop!")
return
super(Progress, self).start()
self.show_on_exit = True
[docs] def stop(self):
"""
trigger clean up by hand, needs to be done when not using
context management via 'with' statement
- will terminate loop process
- show a last progress -> see the full 100% on exit
- releases terminal reservation
"""
super(Progress, self).stop()
terminal_unreserve(progress_obj=self, verbose=self.verbose)
if self.show_on_exit:
if not isinstance(self.pipe_handler, PipeToPrint):
myout = inMemoryBuffer()
stdout = sys.stdout
sys.stdout = myout
self._show_stat()
self.pipe_handler(myout.getvalue())
sys.stdout = stdout
else:
self._show_stat()
print()
self.show_on_exit = False
[docs]class ProgressBar(Progress):
"""
implements a progress bar similar to the one known from 'wget' or 'pv'
"""
def __init__(self, *args, **kwargs):
"""
width [int/'auto'] - the number of characters used to show the Progress bar,
use 'auto' to determine width from terminal information -> see _set_width
"""
Progress.__init__(self, *args, **kwargs)
self._PRE_PREPEND = ESC_NO_CHAR_ATTR + ESC_RED
self._POST_PREPEND = ESC_BOLD + ESC_GREEN
@staticmethod
[docs] def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
if (max_count_value is None) or (max_count_value == 0):
# only show current absolute progress as number and estimated speed
print("{}{}{} [{}] {}#{} ".format(ESC_NO_CHAR_ATTR,
COLTHM['PRE_COL'] + prepend + ESC_DEFAULT,
humanize_time(tet), humanize_speed(speed),
ESC_BOLD + COLTHM['BAR_COL'],
count_value))
else:
if width == 'auto':
width = get_terminal_width()
# deduce relative progress and show as bar on screen
if ttg is None:
s3 = " TTG --"
else:
s3 = " TTG {}".format(humanize_time(ttg))
s1 = "{}{}{} [{}] ".format(ESC_NO_CHAR_ATTR,
COLTHM['PRE_COL'] + prepend + ESC_DEFAULT,
humanize_time(tet),
humanize_speed(speed))
l = len_string_without_ESC(s1+s3)
l2 = width - l - 3
a = int(l2 * count_value / max_count_value)
b = l2 - a
s2 = COLTHM['BAR_COL'] + ESC_BOLD + "[" + "="*a + ">" + " "*b + "]" + ESC_RESET_BOLD + ESC_DEFAULT
print(s1+s2+s3)
[docs]class ProgressBarCounter(Progress):
"""
records also the time of each reset and calculates the speed
of the resets.
shows the TET since init (not effected by reset)
the speed of the resets (number of finished processed per time)
and the number of finished processes
after that also show a progress of each process
max_count > 0 and not None -> bar
max_count == None -> absolute count statistic
max_count == 0 -> hide process statistic at all
"""
def __init__(self, speed_calc_cycles_counter=5, **kwargs):
Progress.__init__(self, **kwargs)
self.counter_count = []
self.counter_q = []
self.counter_speed = []
for i in range(self.len):
self.counter_count.append(UnsignedIntValue(val=0))
self.counter_q.append(myQueue())
self.counter_speed.append(FloatValue())
self.counter_speed_calc_cycles = speed_calc_cycles_counter
self.init_time = time.time()
self.add_args['counter_count'] = self.counter_count
self.add_args['counter_speed'] = self.counter_speed
self.add_args['init_time'] = self.init_time
[docs] def get_counter_count(self, i=0):
return self.counter_count[i].value
def _reset_i(self, i):
c = self.counter_count[i]
with c.get_lock():
c.value += 1
count_value = c.value
q = self.counter_q[i]
current_time = time.time()
q.put((count_value, current_time))
if q.qsize() > self.counter_speed_calc_cycles:
old_count_value, old_time = q.get()
else:
old_count_value, old_time = 0, self.init_time
speed = (count_value - old_count_value) / (current_time - old_time)
self.counter_speed[i].value = speed
Progress._reset_i(self, i)
@staticmethod
[docs] def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
counter_count = kwargs['counter_count'][i]
counter_speed = kwargs['counter_speed'][i]
counter_tet = time.time() - kwargs['init_time']
s_c = "{}{}{} [{}] {}#{} - ".format(ESC_NO_CHAR_ATTR,
COLTHM['PRE_COL']+prepend+ESC_DEFAULT,
humanize_time(counter_tet),
humanize_speed(counter_speed.value),
COLTHM['BAR_COL'],
str(counter_count.value) + ESC_DEFAULT)
if width == 'auto':
width = get_terminal_width()
if (max_count_value is None) or (max_count_value == 0):
s_c = "{}{} [{}] {}#{} ".format(s_c,
humanize_time(tet),
humanize_speed(speed),
COLTHM['BAR_COL'],
str(count_value)+ ESC_DEFAULT)
else:
if ttg is None:
s3 = " TTG --"
else:
s3 = " TTG {}".format(humanize_time(ttg))
s1 = "{} [{}] ".format(humanize_time(tet), humanize_speed(speed))
l = len_string_without_ESC(s1 + s3 + s_c)
l2 = width - l - 3
a = int(l2 * count_value / max_count_value)
b = l2 - a
s2 = COLTHM['BAR_COL'] + ESC_BOLD + "[" + "=" * a + ">" + " " * b + "]" + ESC_RESET_BOLD + ESC_DEFAULT
s_c = s_c+s1+s2+s3
print(s_c)
[docs]class ProgressBarFancy(Progress):
"""
implements a progress bar where the color indicates the current status
similar to the bars known from 'htop'
"""
def __init__(self, *args, **kwargs):
"""
width [int/'auto'] - the number of characters used to show the Progress bar,
use 'auto' to determine width from terminal information -> see _set_width
"""
Progress.__init__(self, *args, **kwargs)
@staticmethod
[docs] def get_d(s1, s2, width, lp, lps):
d = width-len(remove_ESC_SEQ_from_string(s1))-len(remove_ESC_SEQ_from_string(s2))-2-lp-lps
if d >= 0:
d1 = d // 2
d2 = d - d1
return s1, s2, d1, d2
@staticmethod
[docs] def full_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "TET {} {:>12} TTG {}".format(tet, speed, ttg)
s2 = "ETA {} ORT {}".format(eta, ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
[docs] def full_minor_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} {:>12} G {}".format(tet, speed, ttg)
s2 = "A {} O {}".format(eta, ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
[docs] def reduced_1_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} {:>12} G {}".format(tet, speed, ttg)
s2 = "O {}".format(ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
[docs] def reduced_2_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} G {}".format(tet, ttg)
s2 = "O {}".format(ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
[docs] def reduced_3_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} G {}".format(tet, ttg)
s2 = ''
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
[docs] def reduced_4_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = ''
s2 = ''
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
[docs] def kw_bold(s, ch_after):
kws = ['TET', 'TTG', 'ETA', 'ORT', 'E', 'G', 'A', 'O']
for kw in kws:
for c in ch_after:
s = s.replace(kw + c, ESC_BOLD + kw + ESC_RESET_BOLD + c)
return s
@staticmethod
def _stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
if (max_count_value is None) or (max_count_value == 0):
# only show current absolute progress as number and estimated speed
stat = "{}{} [{}] {}#{} ".format(COLTHM['PRE_COL']+prepend+ESC_DEFAULT,
humanize_time(tet),
humanize_speed(speed),
COLTHM['BAR_COL'],
str(count_value) + ESC_DEFAULT)
else:
if width == 'auto':
width = get_terminal_width()
# deduce relative progress
p = count_value / max_count_value
if p < 1:
ps = " {:.1%} ".format(p)
else:
ps = " {:.0%} ".format(p)
if ttg is None:
eta = '--'
ort = None
else:
eta = datetime.datetime.fromtimestamp(time.time() + ttg).strftime("%Y%m%d_%H:%M:%S")
ort = tet + ttg
tet = humanize_time(tet)
speed = '['+humanize_speed(speed)+']'
ttg = humanize_time(ttg)
ort = humanize_time(ort)
repl_ch = '-'
lp = len(prepend)
args = p, tet, speed, ttg, eta, ort, repl_ch, width, lp, len(ps)
res = ProgressBarFancy.full_stat(*args)
if res is None:
res = ProgressBarFancy.full_minor_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_1_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_2_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_3_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_4_stat(*args)
if res is not None:
s1, s2, d1, d2 = res
s = s1 + ' '*d1 + ps + ' '*d2 + s2
s_before = s[:math.ceil(width*p)].replace(' ', repl_ch)
if (len(s_before) > 0) and (s_before[-1] == repl_ch):
s_before = s_before[:-1] + '>'
s_after = s[math.ceil(width*p):]
s_before = ProgressBarFancy.kw_bold(s_before, ch_after=[repl_ch, '>'])
s_after = ProgressBarFancy.kw_bold(s_after, ch_after=[' '])
stat = (COLTHM['PRE_COL']+prepend+ESC_DEFAULT+
COLTHM['BAR_COL']+ESC_BOLD + '[' + ESC_RESET_BOLD + s_before + ESC_DEFAULT +
s_after + ESC_BOLD + COLTHM['BAR_COL']+']' + ESC_NO_CHAR_ATTR)
else:
ps = ps.strip()
if p == 1:
ps = ' '+ps
stat = prepend + ps
return stat
@staticmethod
[docs] def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
stat = ProgressBarFancy._stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs)
print(stat)
[docs]class ProgressBarCounterFancy(ProgressBarCounter):
@staticmethod
[docs] def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
counter_count = kwargs['counter_count'][i]
counter_speed = kwargs['counter_speed'][i]
counter_tet = time.time() - kwargs['init_time']
s_c = "{}{}{} [{}] {}#{} - ".format(ESC_NO_CHAR_ATTR,
COLTHM['PRE_COL']+prepend+ESC_DEFAULT,
humanize_time(counter_tet),
humanize_speed(counter_speed.value),
COLTHM['BAR_COL'],
str(counter_count.value) + ESC_DEFAULT)
if width == 'auto':
width = get_terminal_width()
if (max_count_value is None) or (max_count_value == 0):
s_c = "{}{} [{}] {}#{} ".format(s_c, humanize_time(tet), humanize_speed(speed),
COLTHM['BAR_COL'], str(count_value)+ESC_DEFAULT)
else:
_width = width - len_string_without_ESC(s_c)
s_c += ProgressBarFancy._stat(count_value, max_count_value, '', speed, tet, ttg, _width, i)
print(s_c)
[docs]class SIG_handler_Loop(object):
"""class to setup signal handling for the Loop class
Note: each subprocess receives the default signal handling from it's parent.
If the signal function from the module signal is evoked within the subprocess
this default behavior can be overwritten.
The init function receives a shared memory boolean object which will be set
false in case of signal detection. Since the Loop class will check the state
of this boolean object before each repetition, the loop will stop when
a signal was receives.
"""
def __init__(self, sigint, sigterm, log, prefix):
self.set_signal(signal.SIGINT, sigint)
self.set_signal(signal.SIGTERM, sigterm)
self.prefix = prefix
self.log = log
self.log.info("setup signal handler for loop (SIGINT:%s, SIGTERM:%s)", sigint, sigterm)
[docs] def set_signal(self, sig, handler_str):
if handler_str == 'ign':
signal.signal(sig, self._ignore_signal)
elif handler_str == 'stop':
signal.signal(sig, self._stop_on_signal)
else:
raise TypeError("unknown signal hander string '%s'", handler_str)
def _ignore_signal(self, signal, frame):
pass
def _stop_on_signal(self, signal, frame):
self.log.info("received sig %s -> raise InterruptedError", signal_dict[signal])
raise LoopInterruptError()
[docs]def FloatValue(val=0.):
return mp.Value('d', val, lock=True)
[docs]def UnsignedIntValue(val=0):
return mp.Value('I', val, lock=True)
[docs]def StringValue(num_of_bytes):
return mp.Array('c', _jm_compatible_bytearray(num_of_bytes), lock=True)
[docs]def check_process_termination(proc, prefix, timeout, auto_kill_on_last_resort = False):
proc.join(timeout)
if not proc.is_alive():
log.debug("termination of process (pid %s) within timeout of %ss SUCCEEDED!", proc.pid, timeout)
return True
# process still runs -> send SIGTERM -> see what happens
log.warning("termination of process (pid %s) within given timeout of %ss FAILED!", proc.pid, timeout)
proc.terminate()
new_timeout = 3*timeout
log.debug("wait for termination (timeout %s)", new_timeout)
proc.join(new_timeout)
if not proc.is_alive():
log.info("termination of process (pid %s) via SIGTERM with timeout of %ss SUCCEEDED!", proc.pid, new_timeout)
return True
log.warning("termination of process (pid %s) via SIGTERM with timeout of %ss FAILED!", proc.pid, new_timeout)
log.debug("auto_kill_on_last_resort is %s", auto_kill_on_last_resort)
answer = 'k' if auto_kill_on_last_resort else '_'
while True:
log.debug("answer string is %s", answer)
if answer == 'k':
log.warning("send SIGKILL to process with pid %s", proc.pid)
os.kill(proc.pid, signal.SIGKILL)
time.sleep(0.1)
else:
log.info("send SIGTERM to process with pid %s", proc.pid)
os.kill(proc.pid, signal.SIGTERM)
time.sleep(0.1)
if not proc.is_alive():
log.info("process (pid %s) has stopped running!", proc.pid)
return True
else:
log.warning("process (pid %s) is still running!", proc.pid)
print("the process (pid %s) seems still running".format(proc.pid))
try:
answer = input("press 'enter' to send SIGTERM, enter 'k' to send SIGKILL or enter 'ignore' to not bother about the process anymore")
except Exception as e:
log.error("could not ask for sending SIGKILL due to {}".format(type(e)))
log.info(traceback.format_exc())
log.warning("send SIGKILL now")
answer = 'k'
if answer == 'ignore':
log.warning("ignore process %s", proc.pid)
return False
elif answer != 'k':
answer = ''
[docs]def getCountKwargs(func):
""" Returns a list ["count kwarg", "count_max kwarg"] for a
given function. Valid combinations are defined in
`progress.validCountKwargs`.
Returns None if no keyword arguments are found.
"""
# Get all arguments of the function
if hasattr(func, "__code__"):
func_args = func.__code__.co_varnames[:func.__code__.co_argcount]
for pair in validCountKwargs:
if ( pair[0] in func_args and pair[1] in func_args ):
return pair
# else
return None
[docs]def get_terminal_size(defaultw=80):
""" Checks various methods to determine the terminal size
Methods:
- shutil.get_terminal_size (only Python3)
- fcntl.ioctl
- subprocess.check_output
- os.environ
Parameters
----------
defaultw : int
Default width of terminal.
Returns
-------
width, height : int
Width and height of the terminal. If one of them could not be
found, None is return in its place.
"""
if hasattr(shutil_get_terminal_size, "__call__"):
return shutil_get_terminal_size()
else:
try:
import fcntl, termios, struct
fd = 0
hw = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
return (hw[1], hw[0])
except:
try:
out = sp.check_output(["tput", "cols"])
width = int(out.decode("utf-8").strip())
return (width, None)
except:
try:
hw = (os.environ['LINES'], os.environ['COLUMNS'])
return (hw[1], hw[0])
except:
return (defaultw, None)
[docs]def get_terminal_width(default=80, name=None):
try:
width = get_terminal_size(defaultw=default)[0]
except:
width = default
return width
[docs]def humanize_speed(c_per_sec):
"""convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero.
"""
scales = [60, 60, 24]
units = ['c/s', 'c/min', 'c/h', 'c/d']
speed = c_per_sec
i = 0
if speed > 0:
while (speed < 1) and (i < len(scales)):
speed *= scales[i]
i += 1
return "{:.1f}{}".format(speed, units[i])
[docs]def humanize_time(secs):
"""convert second in to hh:mm:ss format
"""
if secs is None:
return '--'
if secs < 1:
return "{:.2f}ms".format(secs*1000)
elif secs < 10:
return "{:.2f}s".format(secs)
else:
mins, secs = divmod(secs, 60)
hours, mins = divmod(mins, 60)
return '{:02d}:{:02d}:{:02d}'.format(int(hours), int(mins), int(secs))
[docs]def len_string_without_ESC(s):
return len(remove_ESC_SEQ_from_string(s))
[docs]def remove_ESC_SEQ_from_string(s):
old_idx = 0
new_s = ""
ESC_CHAR_START = "\033["
while True:
idx = s.find(ESC_CHAR_START, old_idx)
if idx == -1:
break
j = 2
while s[idx+j] in '0123456789':
j += 1
new_s += s[old_idx:idx]
old_idx = idx+j+1
new_s += s[old_idx:]
return new_s
# for esc_seq in ESC_SEQ_SET:
# s = s.replace(esc_seq, '')
# return s
def _close_kind(stack, which_kind):
stack_tmp = []
s = ""
# close everything until which_kind is found
while True:
kind, start, end = stack.pop()
if kind != which_kind:
s += end
stack_tmp.append((kind, start, end))
else:
break
# close which_kind
s = end
# start everything that was closed before which_kind
for kind, start, end in stack_tmp:
s += start
stack.append((kind, start, end))
return s
def _close_all(stack):
s = ""
for kind, start, end in stack:
s += end
return s
def _open_color(stack, color):
start = '<span style="color:{}">'.format(color)
end = '</span>'
stack.append(('color', start, end))
return start
def _open_bold(stack):
start = '<b>'
end = '</b>'
stack.append(('bold', start, end))
return start
[docs]def ESC_SEQ_to_HTML(s):
old_idx = 0
new_s = ""
ESC_CHAR_START = "\033["
color_on = False
bold_on = False
stack = []
while True:
idx = s.find(ESC_CHAR_START, old_idx)
if idx == -1:
break
j = 2
while s[idx + j] in '0123456789':
j += 1
new_s += s[old_idx:idx]
old_idx = idx + j + 1
escseq = s[idx:idx+j+1]
if escseq in ESC_COLOR_TO_HTML: # set color
if color_on:
new_s += _close_kind(stack, which_kind = 'color')
new_s += _open_color(stack, ESC_COLOR_TO_HTML[escseq])
color_on = True
elif escseq == ESC_DEFAULT: # unset color
if color_on:
new_s += _close_kind(stack, which_kind = 'color')
color_on = False
elif escseq == ESC_BOLD:
if not bold_on:
new_s += _open_bold(stack)
bold_on = True
elif escseq == ESC_RESET_BOLD:
if bold_on:
new_s += _close_kind(stack, which_kind = 'bold')
bold_on = False
elif escseq == ESC_NO_CHAR_ATTR:
if color_on:
new_s += _close_kind(stack, which_kind = 'color')
color_on = False
if bold_on:
new_s += _close_kind(stack, which_kind = 'bold')
bold_on = False
else:
pass
new_s += s[old_idx:]
new_s += _close_all(stack)
return new_s
[docs]def terminal_reserve(progress_obj, terminal_obj=None, identifier=None):
""" Registers the terminal (stdout) for printing.
Useful to prevent multiple processes from writing progress bars
to stdout.
One process (server) prints to stdout and a couple of subprocesses
do not print to the same stdout, because the server has reserved it.
Of course, the clients have to be nice and check with
terminal_reserve first if they should (not) print.
Nothing is locked.
Returns
-------
True if reservation was successful (or if we have already reserved this tty),
False if there already is a reservation from another instance.
"""
if terminal_obj is None:
terminal_obj = sys.stdout
if identifier is None:
identifier = ''
if terminal_obj in TERMINAL_RESERVATION: # terminal was already registered
log.debug("this terminal %s has already been added to reservation list", terminal_obj)
if TERMINAL_RESERVATION[terminal_obj] is progress_obj:
log.debug("we %s have already reserved this terminal %s", progress_obj, terminal_obj)
return True
else:
log.debug("someone else %s has already reserved this terminal %s", TERMINAL_RESERVATION[terminal_obj], terminal_obj)
return False
else: # terminal not yet registered
log.debug("terminal %s was reserved for us %s", terminal_obj, progress_obj)
TERMINAL_RESERVATION[terminal_obj] = progress_obj
return True
[docs]def terminal_unreserve(progress_obj, terminal_obj=None, verbose=0, identifier=None):
""" Unregisters the terminal (stdout) for printing.
an instance (progress_obj) can only unreserve the tty (terminal_obj) when it also reserved it
see terminal_reserved for more information
Returns
-------
None
"""
if terminal_obj is None:
terminal_obj =sys.stdout
if identifier is None:
identifier = ''
else:
identifier = identifier + ': '
po = TERMINAL_RESERVATION.get(terminal_obj)
if po is None:
log.debug("terminal %s was not reserved, nothing happens", terminal_obj)
else:
if po is progress_obj:
log.debug("terminal %s now unreserned", terminal_obj)
del TERMINAL_RESERVATION[terminal_obj]
else:
log.debug("you %s can NOT unreserve terminal %s be cause it was reserved by %s", progress_obj, terminal_obj, po)
[docs]def codecov_subprocess_check():
print("this line will be only called from a subprocess")
myQueue = mp.Queue
# a mapping from the numeric values of the signals to their names used in the
# standard python module signals
signal_dict = {}
for s in dir(signal):
if s.startswith('SIG') and s[3] != '_':
n = getattr(signal, s)
if n in signal_dict:
signal_dict[n] += ('/'+s)
else:
signal_dict[n] = s
[docs]def ESC_MOVE_LINE_UP(n):
return "\033[{}A".format(n)
[docs]def ESC_MOVE_LINE_DOWN(n):
return "\033[{}B".format(n)
ESC_NO_CHAR_ATTR = "\033[0m"
ESC_BOLD = "\033[1m"
ESC_DIM = "\033[2m"
ESC_UNDERLINED = "\033[4m"
ESC_BLINK = "\033[5m"
ESC_INVERTED = "\033[7m"
ESC_HIDDEN = "\033[8m"
ESC_MY_MAGIC_ENDING = ESC_HIDDEN + ESC_NO_CHAR_ATTR
# not widely supported, use '22' instead
# ESC_RESET_BOLD = "\033[21m"
ESC_RESET_DIM = "\033[22m"
ESC_RESET_BOLD = ESC_RESET_DIM
ESC_RESET_UNDERLINED = "\033[24m"
ESC_RESET_BLINK = "\033[25m"
ESC_RESET_INVERTED = "\033[27m"
ESC_RESET_HIDDEN = "\033[28m"
ESC_DEFAULT = "\033[39m"
ESC_BLACK = "\033[30m"
ESC_RED = "\033[31m"
ESC_GREEN = "\033[32m"
ESC_YELLOW = "\033[33m"
ESC_BLUE = "\033[34m"
ESC_MAGENTA = "\033[35m"
ESC_CYAN = "\033[36m"
ESC_LIGHT_GREY = "\033[37m"
ESC_DARK_GREY = "\033[90m"
ESC_LIGHT_RED = "\033[91m"
ESC_LIGHT_GREEN = "\033[92m"
ESC_LIGHT_YELLOW = "\033[93m"
ESC_LIGHT_BLUE = "\033[94m"
ESC_LIGHT_MAGENTA = "\033[95m"
ESC_LIGHT_CYAN = "\033[96m"
ESC_WHITE = "\033[97m"
ESC_COLOR_TO_HTML = {
ESC_BLACK : '#000000',
ESC_RED : '#800000',
ESC_GREEN : '#008000',
ESC_YELLOW : '#808000',
ESC_BLUE : '#000080',
ESC_MAGENTA : '#800080',
ESC_CYAN : '#008080',
ESC_LIGHT_GREY : '#c0c0c0',
ESC_DARK_GREY : '#808080',
ESC_LIGHT_RED : '#ff0000',
ESC_LIGHT_GREEN : '#00ff00',
ESC_LIGHT_YELLOW : '#ffff00',
ESC_LIGHT_BLUE : '#0000ff',
ESC_LIGHT_MAGENTA : '#ff00ff',
ESC_LIGHT_CYAN : '#00ffff',
ESC_WHITE : '#ffffff'}
ESC_SEQ_SET = [ESC_NO_CHAR_ATTR,
ESC_BOLD,
ESC_DIM,
ESC_UNDERLINED,
ESC_BLINK,
ESC_INVERTED,
ESC_HIDDEN,
ESC_RESET_BOLD,
ESC_RESET_DIM,
ESC_RESET_UNDERLINED,
ESC_RESET_BLINK,
ESC_RESET_INVERTED,
ESC_RESET_HIDDEN,
ESC_DEFAULT,
ESC_BLACK,
ESC_RED,
ESC_GREEN,
ESC_YELLOW,
ESC_BLUE,
ESC_MAGENTA,
ESC_CYAN,
ESC_LIGHT_GREY,
ESC_DARK_GREY,
ESC_LIGHT_RED,
ESC_LIGHT_GREEN,
ESC_LIGHT_YELLOW,
ESC_LIGHT_BLUE,
ESC_LIGHT_MAGENTA,
ESC_LIGHT_CYAN,
ESC_WHITE]
_colthm_term_default = {'PRE_COL': ESC_RED, 'BAR_COL': ESC_LIGHT_GREEN}
_colthm_ipyt_default = {'PRE_COL': ESC_RED, 'BAR_COL': ESC_LIGHT_BLUE}
color_themes = {'term_default': _colthm_term_default,
'ipyt_default': _colthm_ipyt_default}
COLTHM = _colthm_term_default
[docs]def choose_color_theme(name):
global COLTHM
if name in color_themes:
COLTHM = color_themes[name]
else:
warnings.warn("no such color theme {}".format(name))
# terminal reservation list, see terminal_reserve
TERMINAL_RESERVATION = {}
# these are classes that print progress bars, see terminal_reserve
TERMINAL_PRINT_LOOP_CLASSES = ["ProgressBar", "ProgressBarCounter", "ProgressBarFancy", "ProgressBarCounterFancy"]
# keyword arguments that define counting in wrapped functions
validCountKwargs = [
[ "count", "count_max"],
[ "count", "max_count"],
[ "c", "m"],
[ "jmc", "jmm"],
]