#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import copy
import datetime
import math
import multiprocessing as mp
import signal
import subprocess as sp
import sys
import time
import traceback
import os
import warnings
try:
from shutil import get_terminal_size as shutil_get_terminal_size
except ImportError:
shutil_get_terminal_size = None
if sys.version_info[0] == 2:
old_math_ceil = math.ceil
def my_int_ceil(f):
return int(old_math_ceil(f))
math.ceil = my_int_ceil
# Magic conversion from 3 to 2
if sys.version_info[0] == 2:
_jm_compatible_bytearray = lambda x: x
else:
_jm_compatible_bytearray = bytearray
[docs]class Loop(object):
"""
class to run a function periodically an seperate process.
In case the called function returns True, the loop will stop.
Otherwise a time interval given by interval will be slept before
another execution is triggered.
The shared memory variable _run (accessible via the class property run)
also determines if the function if executed another time. If set to False
the execution stops.
For safe cleanup (and in order to catch any Errors)
it is advisable to instantiate this class
using 'with' statement as follows:
with Loop(**kwargs) as my_loop:
my_loop.start()
...
this will guarantee you that the spawned loop process is
down when exiting the 'with' scope.
The only circumstance where the process is still running is
when you set auto_kill_on_last_resort to False and answer the
question to send SIGKILL with no.
"""
def __init__(self,
func,
args=(),
interval = 1,
verbose=0,
sigint='stop',
sigterm='stop',
name=None,
auto_kill_on_last_resort=False):
"""
func [callable] - function to be called periodically
args [tuple] - arguments passed to func when calling
intervall [pos number] - time to "sleep" between each call
verbose [pos integer] - specifies the level of verbosity
[0--silent, 1--important information, 2--some debug info]
sigint [string] - signal handler string to set SIGINT behavior (see below)
sigterm [string] - signal handler string to set SIGTERM behavior (see below)
name [string] - use this name in messages instead of the PID
auto_kill_on_last_resort [bool] - If set False (default), ask user to send SIGKILL
to loop process in case normal stop and SIGTERM failed. If set True, send SIDKILL
without asking.
the signal handler string may be one of the following
ing: ignore the incoming signal
stop: set the shared memory boolean to false -> prevents the loop from
repeating -> subprocess terminates when func returns and sleep time interval
has passed.
"""
self.func = func
self.args = args
self.interval = interval
self._run = mp.Value('b', False)
self._pause = mp.Value('b', False)
self.verbose = verbose
self._proc = None
self._sigint = sigint
self._sigterm = sigterm
self._name = name
self._auto_kill_on_last_resort = auto_kill_on_last_resort
if not hasattr(self, '_identifier'):
self._identifier = None
def __enter__(self):
return self
def __exit__(self, *exc_args):
# normal exit
if not self.is_alive():
if self.verbose > 1:
print("{}: has stopped on context exit".format(self._identifier))
return
# loop still runs on context exit -> __cleanup
if self.verbose > 1:
print("{}: is still running on context exit".format(self._identifier))
self.__cleanup()
def __cleanup(self):
"""
Wait at most twice as long as the given repetition interval
for the _wrapper_function to terminate.
If after that time the _wrapper_function has not terminated,
send SIGTERM to and the process.
Wait at most five times as long as the given repetition interval
for the _wrapper_function to terminate.
If the process still running send SIGKILL automatically if
auto_kill_on_last_resort was set True or ask the
user to confirm sending SIGKILL
"""
# set run to False and wait some time -> see what happens
self.run = False
if check_process_termination(proc=self._proc,
identifier=self._identifier,
timeout=2*self.interval,
verbose=self.verbose,
auto_kill_on_last_resort=self._auto_kill_on_last_resort):
if self.verbose > 1:
print("{}: cleanup successful".format(self._identifier))
self._proc = None
self._identifier = get_identifier(self._name, 'not started')
else:
raise RuntimeError("{}: cleanup FAILED!".format(self._identifier))
@staticmethod
def _wrapper_func(func, args, shared_mem_run, shared_mem_pause, interval, verbose, sigint, sigterm, name):
"""to be executed as a separate process (that's why this functions is declared static)
"""
# implement the process specific signal handler
identifier = get_identifier(name)
SIG_handler_Loop(shared_mem_run, sigint, sigterm, identifier, verbose)
while shared_mem_run.value:
# in pause mode, simply sleep
if shared_mem_pause.value:
quit_loop = False
else:
# if not pause mode -> call func and see what happens
try:
quit_loop = func(*args)
except:
err, val, trb = sys.exc_info()
print(ESC_NO_CHAR_ATTR, end='')
sys.stdout.flush()
if verbose > 0:
print("{}: error {} occurred in Loop class calling 'func(*args)'".format(identifier, err))
traceback.print_exc()
return
if quit_loop is True:
return
time.sleep(interval)
if verbose > 1:
print("{}: _wrapper_func terminates gracefully".format(identifier))
[docs] def start(self):
"""
uses multiprocess Process to call _wrapper_func in subprocess
"""
if self.is_alive():
if self.verbose > 0:
print("{}: is already running".format(self._identifier))
return
self.run = True
self._proc = mp.Process(target = Loop._wrapper_func,
args = (self.func, self.args, self._run, self._pause, self.interval,
self.verbose, self._sigint, self._sigterm, self._name),
name=self._name)
self._proc.start()
self._identifier = get_identifier(self._name, self.getpid())
if self.verbose > 1:
print("{}: started as new loop process".format(self._identifier))
[docs] def stop(self):
"""
stops the process triggered by start
Setting the shared memory boolean run to false, which should prevent
the loop from repeating. Call __cleanup to make sure the process
stopped. After that we could trigger start() again.
"""
self.run = False
if not self.is_alive():
if self.verbose > 0:
print("PID None: there is no running loop to stop")
return
self.__cleanup()
[docs] def join(self, timeout):
"""
calls join for the spawned process with given timeout
"""
if self.is_alive():
self._proc.join(timeout)
[docs] def getpid(self):
"""
return the process id of the spawned process
"""
return self._proc.pid
def is_alive(self):
if self._proc == None:
return False
else:
return self._proc.is_alive()
def pause(self):
if not self.run:
if self.verbose > 0:
print("{} is not running -> can not pause".format(self._identifier))
if self._pause.value == True:
if self.verbose > 1:
print("{} is already in pause mode!".format(self._identifier))
self._pause.value = True
def resume(self):
if not self.run:
if self.verbose > 0:
print("{} is not running -> can not resume".format(self._identifier))
if self._pause.value == False:
if self.verbose > 1:
print("{} is not in pause mode -> can not resume!".format(self._identifier))
self._pause.value = False
@property
def run(self):
"""
makes the shared memory boolean accessible as class attribute
Set run False, the loop will stop repeating.
Calling start, will set run True and start the loop again as a new process.
"""
return self._run.value
@run.setter
def run(self, run):
self._run.value = run
[docs]class Progress(Loop):
"""
Abstract Progress Loop
Uses Loop class to implement a repeating function to display the progress
of multiples processes.
In the simplest case, given only a list of shared memory values 'count' (NOTE:
a single value will be automatically mapped to a one element list),
the total elapses time (TET) and a speed estimation are calculated for
each process and passed to the display function show_stat.
This functions needs to be implemented in a subclass. It is intended to show
the progress of ONE process in one line.
When max_count is given (in general as list of shared memory values, a single
shared memory value will be mapped to a one element list) the time to go TTG
will also be calculated and passed tow show_stat.
Information about the terminal width will be retrieved when setting width='auto'.
If supported by the terminal emulator the width in characters of the terminal
emulator will be stored in width and also passed to show_stat.
Otherwise, a default width of 80 characters will be chosen.
Also you can specify a fixed width by setting width to the desired number.
NOTE: in order to achieve multiline progress special escape sequences are used
which may not be supported by all terminal emulators.
example:
c1 = UnsignedIntValue(val=0)
c2 = UnsignedIntValue(val=0)
c = [c1, c2]
prepend = ['c1: ', 'c2: ']
with ProgressCounter(count=c, prepend=prepend) as sc:
sc.start()
while True:
i = np.random.randint(0,2)
with c[i].get_lock():
c[i].value += 1
if c[0].value == 1000:
break
time.sleep(0.01)
using start() within the 'with' statement ensures that the subprocess
which is started by start() in order to show the progress repeatedly
will be terminated on context exit. Otherwise one has to make sure
that at some point the stop() routine is called. When dealing with
signal handling and exception this might become tricky, so that the
use of the 'with' statement is highly encouraged.
"""
def __init__(self,
count,
max_count = None,
prepend = None,
width = 'auto',
speed_calc_cycles = 10,
interval = 1,
verbose = 0,
sigint = 'stop',
sigterm = 'stop',
name = 'progress',
info_line = None):
"""
count [mp.Value] - shared memory to hold the current state, (list or single value)
max_count [mp.Value] - shared memory holding the final state, (None, list or single value),
may be changed by external process without having to explicitly tell this class.
If None, no TTG and relative progress can be calculated -> TTG = None
prepend [string] - string to put in front of the progress output, (None, single string
or of list of strings)
interval [int] - seconds to wait between progress print
speed_calc_cycles [int] - use the current (time, count) as
well as the (old_time, old_count) read by the show_stat function
speed_calc_cycles calls before to calculate the speed as follows:
s = count - old_count / (time - old_time)
verbose, sigint, sigterm -> see loop class
"""
self.name = name
self._identifier = get_identifier(self.name, pid='not started')
try:
for c in count:
assert isinstance(c, mp.sharedctypes.Synchronized), "each element of 'count' must be if the type multiprocessing.sharedctypes.Synchronized"
self.is_multi = True
except TypeError:
assert isinstance(count, mp.sharedctypes.Synchronized), "'count' must be if the type multiprocessing.sharedctypes.Synchronized"
self.is_multi = False
count = [count]
self.len = len(count)
if max_count is not None:
if self.is_multi:
try:
for m in max_count:
assert isinstance(m, mp.sharedctypes.Synchronized), "each element of 'max_count' must be if the type multiprocessing.sharedctypes.Synchronized"
except TypeError:
raise TypeError("'max_count' must be iterable")
else:
assert isinstance(max_count, mp.sharedctypes.Synchronized), "'max_count' must be of the type multiprocessing.sharedctypes.Synchronized"
max_count = [max_count]
else:
max_count = [None] * self.len
self.start_time = []
self.speed_calc_cycles = speed_calc_cycles
self.width = width
self.q = []
self.prepend = []
self.lock = []
self.last_count = []
self.last_old_count = []
self.last_old_time = []
for i in range(self.len):
self.q.append(myQueue()) # queue to save the last speed_calc_cycles
# (time, count) information to calculate speed
self.last_count.append(UnsignedIntValue())
self.last_old_count.append(UnsignedIntValue())
self.last_old_time.append(FloatValue())
self.lock.append(mp.Lock())
self.start_time.append(FloatValue(val=time.time()))
if prepend is None:
# no prepend given
self.prepend.append('')
else:
try:
# assume list of prepend, (needs to be a sequence)
# except if prepend is an instance of string
# the assert will cause the except to be executed
assert not isinstance(prepend, str)
self.prepend.append(prepend[i])
except:
# list fails -> assume single prepend for all
self.prepend.append(prepend)
self.max_count = max_count # list of multiprocessing value type
self.count = count # list of multiprocessing value type
self.interval = interval
self.verbose = verbose
self.show_on_exit = False
self.add_args = {}
self.info_line = info_line
# setup loop class with func
super(Progress, self).__init__(func=Progress.show_stat_wrapper_multi,
args=(self.count,
self.last_count,
self.start_time,
self.max_count,
self.speed_calc_cycles,
self.width,
self.q,
self.last_old_count,
self.last_old_time,
self.prepend,
self.__class__.show_stat,
self.len,
self.add_args,
self.lock,
self.info_line),
interval=interval,
verbose=verbose,
sigint=sigint,
sigterm=sigterm,
name=name,
auto_kill_on_last_resort=True)
def __exit__(self, *exc_args):
self.stop()
@staticmethod
def _calc(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
q,
last_old_count,
last_old_time,
lock):
"""
do the pre calculations in order to get TET, speed, TTG
and call the actual display routine show_stat with these arguments
NOTE: show_stat is purely abstract and need to be reimplemented to
achieve a specific progress display.
"""
count_value = count.value
start_time_value = start_time.value
current_time = time.time()
if last_count.value != count_value:
# some progress happened
with lock:
# save current state (count, time) to queue
q.put((count_value, current_time))
# get older state from queue (or initial state)
# to to speed estimation
if q.qsize() > speed_calc_cycles:
old_count_value, old_time = q.get()
else:
old_count_value, old_time = 0, start_time_value
last_count.value = count_value
last_old_count.value = old_count_value
last_old_time.value = old_time
else:
# progress has not changed since last call
# use also old (cached) data from the queue
old_count_value, old_time = last_old_count.value, last_old_time.value
if (max_count is None):
max_count_value = None
else:
max_count_value = max_count.value
tet = (current_time - start_time_value)
speed = (count_value - old_count_value) / (current_time - old_time)
if (speed == 0) or (max_count_value is None) or (max_count_value == 0):
ttg = None
else:
ttg = math.ceil((max_count_value - count_value) / speed)
return count_value, max_count_value, speed, tet, ttg
def _reset_all(self):
"""
reset all progress information
"""
for i in range(self.len):
self._reset_i(i)
def _reset_i(self, i):
"""
reset i-th progress information
"""
self.count[i].value=0
self.lock[i].acquire()
for x in range(self.q[i].qsize()):
self.q[i].get()
self.lock[i].release()
self.start_time[i].value = time.time()
def _show_stat(self):
"""
convenient functions to call the static show_stat_wrapper_multi with
the given class members
"""
Progress.show_stat_wrapper_multi(self.count,
self.last_count,
self.start_time,
self.max_count,
self.speed_calc_cycles,
self.width,
self.q,
self.last_old_count,
self.last_old_time,
self.prepend,
self.__class__.show_stat,
self.len,
self.add_args,
self.lock,
self.info_line,
no_move_up=True)
[docs] def reset(self, i = None):
"""
convenient function to reset progress information
i [None, int] - None: reset all, int: reset process indexed by i
"""
# super(Progress, self).stop()
if i is None:
self._reset_all()
else:
self._reset_i(i)
# super(Progress, self).start()
@staticmethod
[docs] def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, **kwargs):
"""
re implement this function in a subclass
count_value - current value of progress
max_count_value - maximum value the progress can take
prepend - some extra string to be put for example in front of the
progress display
speed - estimated speed in counts per second (use for example humanize_speed
to get readable information in string format)
tet - total elapsed time in seconds (use for example humanize_time
to get readable information in string format)
ttg - time to go in seconds (use for example humanize_time
to get readable information in string format)
"""
raise NotImplementedError
@staticmethod
def show_stat_wrapper(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
width,
q,
last_old_count,
last_old_time,
prepend,
show_stat_function,
add_args,
i,
lock):
count_value, max_count_value, speed, tet, ttg, = Progress._calc(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
q,
last_old_count,
last_old_time,
lock)
return show_stat_function(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **add_args)
@staticmethod
[docs] def show_stat_wrapper_multi(count,
last_count,
start_time,
max_count,
speed_calc_cycles,
width,
q,
last_old_count,
last_old_time,
prepend,
show_stat_function,
len_,
add_args,
lock,
info_line,
no_move_up=False):
"""
call the static method show_stat_wrapper for each process
"""
# print(ESC_BOLD, end='')
# sys.stdout.flush()
for i in range(len_):
Progress.show_stat_wrapper(count[i],
last_count[i],
start_time[i],
max_count[i],
speed_calc_cycles,
width,
q[i],
last_old_count[i],
last_old_time[i],
prepend[i],
show_stat_function,
add_args,
i,
lock[i])
n = len_
if info_line is not None:
s = info_line.value.decode('utf-8')
s = s.split('\n')
n += len(s)
for si in s:
if width == 'auto':
width = get_terminal_width()
if len(si) > width:
si = si[:width]
print("{0:<{1}}".format(si, width))
if no_move_up:
n = 0
print(ESC_MOVE_LINE_UP(n) + ESC_NO_CHAR_ATTR, end='')
sys.stdout.flush()
def start(self):
# before printing any output to stout, we can now check this
# variable to see if any other ProgressBar has reserved that
# terminal.
if (self.__class__.__name__ in TERMINAL_PRINT_LOOP_CLASSES):
if not terminal_reserve(progress_obj=self, verbose=self.verbose, identifier=self._identifier):
if self.verbose > 1:
print("{}: tty already reserved, NOT starting the progress loop!".format(self._identifier))
return
super(Progress, self).start()
self.show_on_exit = True
[docs] def stop(self, make_sure_its_down = False):
"""
trigger clean up by hand, needs to be done when not using
context management via 'with' statement
- will terminate loop process
- show a last progress -> see the full 100% on exit
- releases terminal reservation
"""
self._auto_kill_on_last_resort = make_sure_its_down
super(Progress, self).stop()
terminal_unreserve(progress_obj=self, verbose=self.verbose, identifier=self._identifier)
if self.show_on_exit:
self._show_stat()
print()
self.show_on_exit = False
[docs]class ProgressBar(Progress):
"""
implements a progress bar similar to the one known from 'wget' or 'pv'
"""
def __init__(self,
count,
max_count=None,
width='auto',
prepend=None,
speed_calc_cycles=10,
interval=1,
verbose=0,
sigint='stop',
sigterm='stop',
name='progress_bar',
info_line=None):
"""
width [int/'auto'] - the number of characters used to show the Progress bar,
use 'auto' to determine width from terminal information -> see _set_width
"""
super(ProgressBar, self).__init__(count=count,
max_count=max_count,
prepend=prepend,
speed_calc_cycles=speed_calc_cycles,
width=width,
interval=interval,
verbose = verbose,
sigint=sigint,
sigterm=sigterm,
name=name,
info_line=info_line)
self._PRE_PREPEND = ESC_NO_CHAR_ATTR + ESC_RED
self._POST_PREPEND = ESC_BOLD + ESC_GREEN
@staticmethod
def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
if max_count_value is None:
# only show current absolute progress as number and estimated speed
print("{}{}{}{} [{}] #{} ".format(ESC_NO_CHAR_ATTR + ESC_RED,
prepend,
ESC_BOLD + ESC_GREEN,
humanize_time(tet), humanize_speed(speed), count_value))
else:
if width == 'auto':
width = get_terminal_width()
# deduce relative progress and show as bar on screen
if ttg is None:
s3 = "] TTG --"
else:
s3 = "] TTG {}".format(humanize_time(ttg))
s1 = "{}{}{}{} [{}] [".format(ESC_NO_CHAR_ATTR + ESC_RED,
prepend,
ESC_BOLD + ESC_GREEN,
humanize_time(tet),
humanize_speed(speed))
l = len_string_without_ESC(s1+s3)
if max_count_value != 0:
l2 = width - l - 1
a = int(l2 * count_value / max_count_value)
b = l2 - a
s2 = "="*a + ">" + " "*b
else:
s2 = " "*(width - l)
print(s1+s2+s3)
[docs]class ProgressBarCounter(Progress):
"""
records also the time of each reset and calculates the speed
of the resets.
shows the TET since init (not effected by reset)
the speed of the resets (number of finished processed per time)
and the number of finished processes
after that also show a progress of each process
max_count > 0 and not None -> bar
max_count == None -> absolute count statistic
max_count == 0 -> hide process statistic at all
"""
def __init__(self,
count,
max_count=None,
prepend=None,
speed_calc_cycles_bar=10,
speed_calc_cycles_counter=5,
width='auto',
interval=1,
verbose=0,
sigint='stop',
sigterm='stop',
name='progress_bar_counter',
info_line=None):
super(ProgressBarCounter, self).__init__(count=count,
max_count=max_count,
prepend=prepend,
speed_calc_cycles=speed_calc_cycles_bar,
width=width,
interval=interval,
verbose = verbose,
sigint=sigint,
sigterm=sigterm,
name=name,
info_line=info_line)
self.counter_count = []
self.counter_q = []
self.counter_speed = []
for i in range(self.len):
self.counter_count.append(UnsignedIntValue(val=0))
self.counter_q.append(myQueue())
self.counter_speed.append(FloatValue())
self.counter_speed_calc_cycles = speed_calc_cycles_counter
self.init_time = time.time()
self.add_args['counter_count'] = self.counter_count
self.add_args['counter_speed'] = self.counter_speed
self.add_args['init_time'] = self.init_time
def get_counter_count(self, i=0):
return self.counter_count[i].value
def _reset_i(self, i):
c = self.counter_count[i]
with c.get_lock():
c.value += 1
count_value = c.value
q = self.counter_q[i]
current_time = time.time()
q.put((count_value, current_time))
if q.qsize() > self.counter_speed_calc_cycles:
old_count_value, old_time = q.get()
else:
old_count_value, old_time = 0, self.init_time
speed = (count_value - old_count_value) / (current_time - old_time)
self.counter_speed[i].value = speed
super(ProgressBarCounter, self)._reset_i(i)
@staticmethod
def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
counter_count = kwargs['counter_count'][i]
counter_speed = kwargs['counter_speed'][i]
counter_tet = time.time() - kwargs['init_time']
s_c = "{}{}{}{} [{}] #{}".format(ESC_NO_CHAR_ATTR + ESC_RED,
prepend,
ESC_BOLD + ESC_GREEN,
humanize_time(counter_tet),
humanize_speed(counter_speed.value),
counter_count.value)
if width == 'auto':
width = get_terminal_width()
if max_count_value != 0:
s_c += ' - '
if max_count_value is None:
s_c = "{}{} [{}] #{} ".format(s_c, humanize_time(tet), humanize_speed(speed), count_value)
else:
if ttg is None:
s3 = "] TTG --"
else:
s3 = "] TTG {}".format(humanize_time(ttg))
s1 = "{} [{}] [".format(humanize_time(tet), humanize_speed(speed))
l = len_string_without_ESC(s1 + s3 + s_c)
l2 = width - l - 1
a = int(l2 * count_value / max_count_value)
b = l2 - a
s2 = "="*a + ">" + " "*b
s_c = s_c+s1+s2+s3
print(s_c + ' '*(width - len_string_without_ESC(s_c)))
[docs]class ProgressBarFancy(Progress):
"""
implements a progress bar where the color indicates the current status
similar to the bars known from 'htop'
"""
def __init__(self,
count,
max_count=None,
width='auto',
prepend=None,
speed_calc_cycles=10,
interval=1,
verbose=0,
sigint='stop',
sigterm='stop',
name='progress_bar',
info_line=None):
"""
width [int/'auto'] - the number of characters used to show the Progress bar,
use 'auto' to determine width from terminal information -> see _set_width
"""
if not self.__class__.__name__ in TERMINAL_PRINT_LOOP_CLASSES:
TERMINAL_PRINT_LOOP_CLASSES.append(self.__class__.__name__)
super(ProgressBarFancy, self).__init__(count=count,
max_count=max_count,
prepend=prepend,
speed_calc_cycles=speed_calc_cycles,
width=width,
interval=interval,
verbose = verbose,
sigint=sigint,
sigterm=sigterm,
name=name,
info_line=info_line)
@staticmethod
def get_d(s1, s2, width, lp, lps):
d = width-len(remove_ESC_SEQ_from_string(s1))-len(remove_ESC_SEQ_from_string(s2))-2-lp-lps
if d >= 0:
d1 = d // 2
d2 = d - d1
return s1, s2, d1, d2
@staticmethod
def full_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "TET {} {:>12} TTG {}".format(tet, speed, ttg)
s2 = "ETA {} ORT {}".format(eta, ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
def full_minor_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} {:>12} G {}".format(tet, speed, ttg)
s2 = "A {} O {}".format(eta, ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
def reduced_1_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} {:>12} G {}".format(tet, speed, ttg)
s2 = "O {}".format(ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
def reduced_2_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} G {}".format(tet, ttg)
s2 = "O {}".format(ort)
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
def reduced_3_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = "E {} G {}".format(tet, ttg)
s2 = ''
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
def reduced_4_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps):
s1 = ''
s2 = ''
return ProgressBarFancy.get_d(s1, s2, width, lp, lps)
@staticmethod
def kw_bold(s, ch_after):
kws = ['TET', 'TTG', 'ETA', 'ORT', 'E', 'G', 'A', 'O']
for kw in kws:
for c in ch_after:
s = s.replace(kw + c, ESC_BOLD + kw + ESC_RESET_BOLD + c)
return s
@staticmethod
def _stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
if max_count_value is None:
# only show current absolute progress as number and estimated speed
stat = "{}{} [{}] #{} ".format(prepend, humanize_time(tet), humanize_speed(speed), count_value)
else:
if width == 'auto':
width = get_terminal_width()
# deduce relative progress
p = count_value / max_count_value
if p < 1:
ps = " {:.1%} ".format(p)
else:
ps = " {:.0%} ".format(p)
if ttg is None:
eta = '--'
ort = None
else:
eta = datetime.datetime.fromtimestamp(time.time() + ttg).strftime("%Y%m%d_%H:%M:%S")
ort = tet + ttg
tet = humanize_time(tet)
speed = '['+humanize_speed(speed)+']'
ttg = humanize_time(ttg)
ort = humanize_time(ort)
repl_ch = '-'
lp = len(prepend)
args = p, tet, speed, ttg, eta, ort, repl_ch, width, lp, len(ps)
res = ProgressBarFancy.full_stat(*args)
if res is None:
res = ProgressBarFancy.full_minor_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_1_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_2_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_3_stat(*args)
if res is None:
res = ProgressBarFancy.reduced_4_stat(*args)
if res is not None:
s1, s2, d1, d2 = res
s = s1 + ' '*d1 + ps + ' '*d2 + s2
s_before = s[:math.ceil(width*p)].replace(' ', repl_ch)
if (len(s_before) > 0) and (s_before[-1] == repl_ch):
s_before = s_before[:-1] + '>'
s_after = s[math.ceil(width*p):]
s_before = ProgressBarFancy.kw_bold(s_before, ch_after=[repl_ch, '>'])
s_after = ProgressBarFancy.kw_bold(s_after, ch_after=[' '])
stat = prepend + ESC_BOLD + '[' + ESC_RESET_BOLD + ESC_LIGHT_GREEN + s_before + ESC_DEFAULT + s_after + ESC_BOLD + ']' + ESC_NO_CHAR_ATTR
else:
ps = ps.strip()
if p == 1:
ps = ' '+ps
stat = prepend + ps
return stat
@staticmethod
def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
stat = ProgressBarFancy._stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs)
print(stat)
class ProgressBarCounterFancy(ProgressBarCounter):
@staticmethod
def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
counter_count = kwargs['counter_count'][i]
counter_speed = kwargs['counter_speed'][i]
counter_tet = time.time() - kwargs['init_time']
s_c = "{}{}{}{} {:>12} #{}".format(ESC_RED,
prepend,
ESC_NO_CHAR_ATTR,
humanize_time(counter_tet),
'['+humanize_speed(counter_speed.value)+']',
counter_count.value)
if width == 'auto':
width = get_terminal_width()
if max_count_value != 0:
s_c += ' '
if max_count_value is None:
s_c = "{}{} {:>12} #{} ".format(s_c, humanize_time(tet), '['+humanize_speed(speed)+']', count_value)
else:
_width = width - len_string_without_ESC(s_c)
s_c += ProgressBarFancy._stat(count_value, max_count_value, '', speed, tet, ttg, _width, i)
print(s_c + ' '*(width - len_string_without_ESC(s_c)))
class ProgressSilentDummy(Progress):
def __init__(self, **kwargs):
pass
def __exit__(self, *exc_args):
pass
def start(self):
pass
def _reset_i(self, i):
pass
def reset(self, i):
pass
def _reset_all(self):
pass
def stop(self):
pass
def pause(self):
pass
def resume(self):
pass
[docs]class SIG_handler_Loop(object):
"""class to setup signal handling for the Loop class
Note: each subprocess receives the default signal handling from it's parent.
If the signal function from the module signal is evoked within the subprocess
this default behavior can be overwritten.
The init function receives a shared memory boolean object which will be set
false in case of signal detection. Since the Loop class will check the state
of this boolean object before each repetition, the loop will stop when
a signal was receives.
"""
def __init__(self, shared_mem_run, sigint, sigterm, identifier, verbose=0):
self.shared_mem_run = shared_mem_run
self.set_signal(signal.SIGINT, sigint)
self.set_signal(signal.SIGTERM, sigterm)
self.verbose=verbose
self.identifier = identifier
if self.verbose > 1:
print("{}: setup signal handler for loop (SIGINT:{}, SIGTERM:{})".format(self.identifier, sigint, sigterm))
def set_signal(self, sig, handler_str):
if handler_str == 'ign':
signal.signal(sig, self._ignore_signal)
elif handler_str == 'stop':
signal.signal(sig, self._stop_on_signal)
else:
raise TypeError("unknown signal hander string '{}'".format(handler_str))
def _ignore_signal(self, signal, frame):
pass
def _stop_on_signal(self, signal, frame):
if self.verbose > 0:
print("{}: received sig {} -> set run false".format(self.identifier, signal_dict[signal]))
self.shared_mem_run.value = False
# class ProgressCounter(Progress):
# """
# simple Progress counter, not using the max_count information
# """
# def __init__(self,
# count,
# max_count=None,
# prepend=None,
# speed_calc_cycles=10,
# width='auto',
# interval=1,
# verbose=0,
# sigint='stop',
# sigterm='stop',
# name='progress_counter'):
#
# super(ProgressCounter, self).__init__(count=count,
# max_count=max_count,
# prepend=prepend,
# speed_calc_cycles=speed_calc_cycles,
# width=width,
# interval=interval,
# verbose = verbose,
# sigint=sigint,
# sigterm=sigterm,
# name=name)
#
# @staticmethod
# def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs):
# if max_count_value is not None:
# max_count_str = "/{}".format(max_count_value)
# else:
# max_count_value = count_value + 1
# max_count_str = ""
#
# s = "{}{} [{}{}] ({})".format(prepend, humanize_time(tet), count_value, max_count_str, humanize_speed(speed))
# print(s)
def ESC_MOVE_LINE_UP(n):
return "\033[{}A".format(n)
def ESC_MOVE_LINE_DOWN(n):
return "\033[{}B".format(n)
def FloatValue(val=0.):
return mp.Value('d', val, lock=True)
def UnsignedIntValue(val=0):
return mp.Value('I', val, lock=True)
def StringValue(num_of_bytes):
return mp.Array('c', _jm_compatible_bytearray(num_of_bytes), lock=True)
def check_process_termination(proc, identifier, timeout, verbose=0, auto_kill_on_last_resort = False):
proc.join(timeout)
if not proc.is_alive():
if verbose > 1:
print("{}: loop termination within given timeout of {}s SUCCEEDED!".format(identifier, timeout))
return True
# process still runs -> send SIGTERM -> see what happens
if verbose > 0:
print("{}: loop termination within given timeout of {}s FAILED!".format(identifier, timeout))
proc.terminate()
new_timeout = 3*timeout
proc.join(new_timeout)
if not proc.is_alive():
if verbose > 0:
print("{}: loop termination via SIGTERM with timeout of {}s SUCCEEDED!".format(identifier, new_timeout))
return True
if verbose > 0:
print("{}: loop termination via SIGTERM with timeout of {}s FAILED!".format(identifier, new_timeout))
answer = 'y' if auto_kill_on_last_resort else '_'
while True:
if answer == 'y':
print("{}: send SIGKILL to".format(identifier))
os.kill(proc.pid, signal.SIGKILL)
time.sleep(0.1)
if not proc.is_alive():
print("{}: has stopped running!".format(identifier))
return True
else:
print("{}: still running!".format(identifier))
answer = '_'
while not answer in 'yn':
print("Do you want to send SIGKILL to '{}'? [y/n]: ".format(identifier), end='')
sys.stdout.flush()
answer = sys.stdin.readline()[:-1]
if answer == 'n':
while not answer in 'yn':
print("Do you want let the process '{}' running? [y/n]: ".format(identifier), end='')
sys.stdout.flush()
answer = sys.stdin.readline()[:-1]
if answer == 'y':
print("{}: keeps running".format(identifier))
return False
def get_identifier(name=None, pid=None, bold=True):
if pid == None:
pid = os.getpid()
if bold:
esc_bold = ESC_BOLD
esc_no_char_attr = ESC_NO_CHAR_ATTR
else:
esc_bold = ""
esc_no_char_attr = ""
if name == None:
return "{}PID {}{}".format(esc_bold, pid, esc_no_char_attr)
else:
return "{}{} ({}){}".format(esc_bold, name, pid, esc_no_char_attr)
[docs]def get_terminal_size(defaultw=80):
""" Checks various methods to determine the terminal size
Methods:
- shutil.get_terminal_size (only Python3)
- fcntl.ioctl
- subprocess.check_output
- os.environ
Parameters
----------
defaultw : int
Default width of terminal.
Returns
-------
width, height : int
Width and height of the terminal. If one of them could not be
found, None is return in its place.
"""
if hasattr(shutil_get_terminal_size, "__call__"):
return shutil_get_terminal_size()
else:
try:
import fcntl, termios, struct
fd = 0
hw = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
return (hw[1], hw[0])
except:
try:
out = sp.check_output(["tput", "cols"])
width = int(out.decode("utf-8").strip())
return (width, None)
except:
try:
hw = (os.environ['LINES'], os.environ['COLUMNS'])
return (hw[1], hw[0])
except:
return (defaultw, None)
def get_terminal_width(default=80, name=None, verbose=0):
id = get_identifier(name=name)
try:
width = get_terminal_size(defaultw=default)[0]
except:
width = default
if verbose > 1:
print("{}: use terminal width {}".format(id, width))
return width
[docs]def humanize_speed(c_per_sec):
"""convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero.
"""
scales = [60, 60, 24]
units = ['c/s', 'c/min', 'c/h', 'c/d']
speed = c_per_sec
i = 0
if speed > 0:
while (speed < 1) and (i < len(scales)):
speed *= scales[i]
i += 1
return "{:.1f}{}".format(speed, units[i])
[docs]def humanize_time(secs):
"""convert second in to hh:mm:ss format
"""
if secs is None:
return '--'
mins, secs = divmod(secs, 60)
hours, mins = divmod(mins, 60)
return '{:02d}:{:02d}:{:02d}'.format(int(hours), int(mins), int(secs))
def len_string_without_ESC(s):
return len(remove_ESC_SEQ_from_string(s))
def printQueue(q, lock=None):
if lock is not None:
lock.acquire()
res = []
for i in range(q.qsize()):
item = q.get()
res.append(copy.deepcopy(item[0]))
q.put(item)
if lock is not None:
lock.release()
print(res)
def remove_ESC_SEQ_from_string(s):
for esc_seq in ESC_SEQ_SET:
s = s.replace(esc_seq, '')
return s
[docs]def terminal_reserve(progress_obj, terminal_obj=None, verbose=0, identifier=None):
""" Registers the terminal (stdout) for printing.
Useful to prevent multiple processes from writing progress bars
to stdout.
One process (server) prints to stdout and a couple of subprocesses
do not print to the same stdout, because the server has reserved it.
Of course, the clients have to be nice and check with
terminal_reserve first if they should (not) print.
Nothing is locked.
Returns
-------
True if reservation was successful (or if we have already reserved this tty),
False if there already is a reservation from another instance.
"""
if terminal_obj is None:
terminal_obj = sys.stdout
if identifier is None:
identifier = ''
else:
identifier = identifier + ': '
if terminal_obj in TERMINAL_RESERVATION: # terminal was already registered
if verbose > 1:
print("{}this terminal {} has already been added to reservation list".format(identifier, terminal_obj))
if TERMINAL_RESERVATION[terminal_obj] is progress_obj:
if verbose > 1:
print("{}we {} have already reserved this terminal {}".format(identifier, progress_obj, terminal_obj))
return True
else:
if verbose > 1:
print("{}someone else {} has already reserved this terminal {}".format(identifier, TERMINAL_RESERVATION[terminal_obj], terminal_obj))
return False
else: # terminal not yet registered
if verbose > 1:
print("{}terminal {} was reserved for us {}".format(identifier, terminal_obj, progress_obj))
TERMINAL_RESERVATION[terminal_obj] = progress_obj
return True
[docs]def terminal_unreserve(progress_obj, terminal_obj=None, verbose=0, identifier=None):
""" Unregisters the terminal (stdout) for printing.
an instance (progress_obj) can only unreserve the tty (terminal_obj) when it also reserved it
see terminal_reserved for more information
Returns
-------
None
"""
if terminal_obj is None:
terminal_obj =sys.stdout
if identifier is None:
identifier = ''
else:
identifier = identifier + ': '
po = TERMINAL_RESERVATION.get(terminal_obj)
if po is None:
if verbose > 1:
print("{}terminal {} was not reserved, nothing happens".format(identifier, terminal_obj))
else:
if po is progress_obj:
if verbose > 1:
print("{}terminal {} now unreserned".format(identifier, terminal_obj))
del TERMINAL_RESERVATION[terminal_obj]
else:
if verbose > 1:
print("{}you {} can NOT unreserve terminal {} be cause it was reserved by {}".format(identifier, progress_obj, terminal_obj, po))
myQueue = mp.Queue
# a mapping from the numeric values of the signals to their names used in the
# standard python module signals
signal_dict = {}
for s in dir(signal):
if s.startswith('SIG') and s[3] != '_':
n = getattr(signal, s)
if n in signal_dict:
signal_dict[n] += ('/'+s)
else:
signal_dict[n] = s
ESC_NO_CHAR_ATTR = "\033[0m"
ESC_BOLD = "\033[1m"
ESC_DIM = "\033[2m"
ESC_UNDERLINED = "\033[4m"
ESC_BLINK = "\033[5m"
ESC_INVERTED = "\033[7m"
ESC_HIDDEN = "\033[8m"
# not widely supported, use '22' instead
# ESC_RESET_BOLD = "\033[21m"
ESC_RESET_DIM = "\033[22m"
ESC_RESET_BOLD = ESC_RESET_DIM
ESC_RESET_UNDERLINED = "\033[24m"
ESC_RESET_BLINK = "\033[25m"
ESC_RESET_INVERTED = "\033[27m"
ESC_RESET_HIDDEN = "\033[28m"
ESC_DEFAULT = "\033[39m"
ESC_BLACK = "\033[30m"
ESC_RED = "\033[31m"
ESC_GREEN = "\033[32m"
ESC_YELLOW = "\033[33m"
ESC_BLUE = "\033[34m"
ESC_MAGENTA = "\033[35m"
ESC_CYAN = "\033[36m"
ESC_LIGHT_GREY = "\033[37m"
ESC_DARK_GREY = "\033[90m"
ESC_LIGHT_RED = "\033[91m"
ESC_LIGHT_GREEN = "\033[92m"
ESC_LIGHT_YELLOW = "\033[93m"
ESC_LIGHT_BLUE = "\033[94m"
ESC_LIGHT_MAGENTA = "\033[95m"
ESC_LIGHT_CYAN = "\033[96m"
ESC_WHITE = "\033[97m"
ESC_SEQ_SET = [ESC_NO_CHAR_ATTR,
ESC_BOLD,
ESC_DIM,
ESC_UNDERLINED,
ESC_BLINK,
ESC_INVERTED,
ESC_HIDDEN,
ESC_RESET_BOLD,
ESC_RESET_DIM,
ESC_RESET_UNDERLINED,
ESC_RESET_BLINK,
ESC_RESET_INVERTED,
ESC_RESET_HIDDEN,
ESC_DEFAULT,
ESC_BLACK,
ESC_RED,
ESC_GREEN,
ESC_YELLOW,
ESC_BLUE,
ESC_MAGENTA,
ESC_CYAN,
ESC_LIGHT_GREY,
ESC_DARK_GREY,
ESC_LIGHT_RED,
ESC_LIGHT_GREEN,
ESC_LIGHT_YELLOW,
ESC_LIGHT_BLUE,
ESC_LIGHT_MAGENTA,
ESC_LIGHT_CYAN,
ESC_WHITE]
# terminal reservation list, see terminal_reserve
TERMINAL_RESERVATION = {}
# these are classes that print progress bars, see terminal_reserve
TERMINAL_PRINT_LOOP_CLASSES = ["ProgressBar", "ProgressBarCounter"]