jobmanager documentation

Main module

Easy distributed computing based on the python class SyncManager for remote communication and python module multiprocessing for local parallelism.

Scheduling across different processes/machines is implemented in the core modules jobmanager.jobmanager, jobmanager.servers, and jobmanager.clients.

JobManager_Client(server, authkey[, port, ...]) Calls the functions self.func with arguments fetched from the job_q.
JobManager_Server(authkey[, const_arg, ...]) general usage:
_images/server_client_communication.png

Progress classes are implemented in the jobmanager.progress submodule. Intuitive access to progress bars is facilitated with decorators (jobmanager.decorators).

Fork jobmanager on GitHib.

Progress monitoring

class jobmanager.progress.Loop(func, args=(), interval=1, verbose=0, sigint='stop', sigterm='stop', name=None, auto_kill_on_last_resort=False)[source]

class to run a function periodically an seperate process.

In case the called function returns True, the loop will stop. Otherwise a time interval given by interval will be slept before another execution is triggered.

The shared memory variable _run (accessible via the class property run) also determines if the function if executed another time. If set to False the execution stops.

For safe cleanup (and in order to catch any Errors) it is advisable to instantiate this class using ‘with’ statement as follows:

with Loop(**kwargs) as my_loop:
my_loop.start() ...

this will guarantee you that the spawned loop process is down when exiting the ‘with’ scope.

The only circumstance where the process is still running is when you set auto_kill_on_last_resort to False and answer the question to send SIGKILL with no.

Attributes

run makes the shared memory boolean accessible as class attribute

Methods

getpid() return the process id of the spawned process
is_alive()
join(timeout) calls join for the spawned process with given timeout
pause()
resume()
start() uses multiprocess Process to call _wrapper_func in subprocess
stop() stops the process triggered by start
getpid()[source]

return the process id of the spawned process

join(timeout)[source]

calls join for the spawned process with given timeout

run

makes the shared memory boolean accessible as class attribute

Set run False, the loop will stop repeating. Calling start, will set run True and start the loop again as a new process.

start()[source]

uses multiprocess Process to call _wrapper_func in subprocess

stop()[source]

stops the process triggered by start

Setting the shared memory boolean run to false, which should prevent the loop from repeating. Call __cleanup to make sure the process stopped. After that we could trigger start() again.

class jobmanager.progress.Progress(count, max_count=None, prepend=None, width='auto', speed_calc_cycles=10, interval=1, verbose=0, sigint='stop', sigterm='stop', name='progress', info_line=None)[source]

Abstract Progress Loop

Uses Loop class to implement a repeating function to display the progress of multiples processes.

In the simplest case, given only a list of shared memory values ‘count’ (NOTE: a single value will be automatically mapped to a one element list), the total elapses time (TET) and a speed estimation are calculated for each process and passed to the display function show_stat.

This functions needs to be implemented in a subclass. It is intended to show the progress of ONE process in one line.

When max_count is given (in general as list of shared memory values, a single shared memory value will be mapped to a one element list) the time to go TTG will also be calculated and passed tow show_stat.

Information about the terminal width will be retrieved when setting width=’auto’. If supported by the terminal emulator the width in characters of the terminal emulator will be stored in width and also passed to show_stat. Otherwise, a default width of 80 characters will be chosen.

Also you can specify a fixed width by setting width to the desired number.

NOTE: in order to achieve multiline progress special escape sequences are used which may not be supported by all terminal emulators.

example:

c1 = UnsignedIntValue(val=0) c2 = UnsignedIntValue(val=0)

c = [c1, c2] prepend = [‘c1: ‘, ‘c2: ‘] with ProgressCounter(count=c, prepend=prepend) as sc:

sc.start() while True:

i = np.random.randint(0,2) with c[i].get_lock():

c[i].value += 1
if c[0].value == 1000:
break

time.sleep(0.01)

using start() within the ‘with’ statement ensures that the subprocess which is started by start() in order to show the progress repeatedly will be terminated on context exit. Otherwise one has to make sure that at some point the stop() routine is called. When dealing with signal handling and exception this might become tricky, so that the use of the ‘with’ statement is highly encouraged.

Attributes

run makes the shared memory boolean accessible as class attribute

Methods

getpid() return the process id of the spawned process
is_alive()
join(timeout) calls join for the spawned process with given timeout
pause()
reset([i]) convenient function to reset progress information
resume()
show_stat(count_value, max_count_value, ...) re implement this function in a subclass
show_stat_wrapper(count, last_count, ...)
show_stat_wrapper_multi(count, last_count, ...) call the static method show_stat_wrapper for each process
start()
stop([make_sure_its_down]) trigger clean up by hand, needs to be done when not using
reset(i=None)[source]

convenient function to reset progress information

i [None, int] - None: reset all, int: reset process indexed by i

static show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, **kwargs)[source]

re implement this function in a subclass

count_value - current value of progress

max_count_value - maximum value the progress can take

prepend - some extra string to be put for example in front of the progress display

speed - estimated speed in counts per second (use for example humanize_speed to get readable information in string format)

tet - total elapsed time in seconds (use for example humanize_time to get readable information in string format)

ttg - time to go in seconds (use for example humanize_time to get readable information in string format)

static show_stat_wrapper_multi(count, last_count, start_time, max_count, speed_calc_cycles, width, q, last_old_count, last_old_time, prepend, show_stat_function, len_, add_args, lock, info_line, no_move_up=False)[source]

call the static method show_stat_wrapper for each process

stop(make_sure_its_down=False)[source]

trigger clean up by hand, needs to be done when not using context management via ‘with’ statement

  • will terminate loop process
  • show a last progress -> see the full 100% on exit
  • releases terminal reservation
class jobmanager.progress.ProgressBar(count, max_count=None, width='auto', prepend=None, speed_calc_cycles=10, interval=1, verbose=0, sigint='stop', sigterm='stop', name='progress_bar', info_line=None)[source]

implements a progress bar similar to the one known from ‘wget’ or ‘pv’

Attributes

run makes the shared memory boolean accessible as class attribute

Methods

getpid() return the process id of the spawned process
is_alive()
join(timeout) calls join for the spawned process with given timeout
pause()
reset([i]) convenient function to reset progress information
resume()
show_stat(count_value, max_count_value, ...)
show_stat_wrapper(count, last_count, ...)
show_stat_wrapper_multi(count, last_count, ...) call the static method show_stat_wrapper for each process
start()
stop([make_sure_its_down]) trigger clean up by hand, needs to be done when not using
class jobmanager.progress.ProgressBarCounter(count, max_count=None, prepend=None, speed_calc_cycles_bar=10, speed_calc_cycles_counter=5, width='auto', interval=1, verbose=0, sigint='stop', sigterm='stop', name='progress_bar_counter', info_line=None)[source]

records also the time of each reset and calculates the speed of the resets.

shows the TET since init (not effected by reset) the speed of the resets (number of finished processed per time) and the number of finished processes

after that also show a progress of each process max_count > 0 and not None -> bar max_count == None -> absolute count statistic max_count == 0 -> hide process statistic at all

Attributes

run makes the shared memory boolean accessible as class attribute

Methods

get_counter_count([i])
getpid() return the process id of the spawned process
is_alive()
join(timeout) calls join for the spawned process with given timeout
pause()
reset([i]) convenient function to reset progress information
resume()
show_stat(count_value, max_count_value, ...)
show_stat_wrapper(count, last_count, ...)
show_stat_wrapper_multi(count, last_count, ...) call the static method show_stat_wrapper for each process
start()
stop([make_sure_its_down]) trigger clean up by hand, needs to be done when not using
class jobmanager.progress.ProgressBarFancy(count, max_count=None, width='auto', prepend=None, speed_calc_cycles=10, interval=1, verbose=0, sigint='stop', sigterm='stop', name='progress_bar', info_line=None)[source]

implements a progress bar where the color indicates the current status similar to the bars known from ‘htop’

Attributes

run makes the shared memory boolean accessible as class attribute

Methods

full_minor_stat(p, tet, speed, ttg, eta, ...)
full_stat(p, tet, speed, ttg, eta, ort, ...)
get_d(s1, s2, width, lp, lps)
getpid() return the process id of the spawned process
is_alive()
join(timeout) calls join for the spawned process with given timeout
kw_bold(s, ch_after)
pause()
reduced_1_stat(p, tet, speed, ttg, eta, ort, ...)
reduced_2_stat(p, tet, speed, ttg, eta, ort, ...)
reduced_3_stat(p, tet, speed, ttg, eta, ort, ...)
reduced_4_stat(p, tet, speed, ttg, eta, ort, ...)
reset([i]) convenient function to reset progress information
resume()
show_stat(count_value, max_count_value, ...)
show_stat_wrapper(count, last_count, ...)
show_stat_wrapper_multi(count, last_count, ...) call the static method show_stat_wrapper for each process
start()
stop([make_sure_its_down]) trigger clean up by hand, needs to be done when not using
class jobmanager.progress.SIG_handler_Loop(shared_mem_run, sigint, sigterm, identifier, verbose=0)[source]

class to setup signal handling for the Loop class

Note: each subprocess receives the default signal handling from it’s parent. If the signal function from the module signal is evoked within the subprocess this default behavior can be overwritten.

The init function receives a shared memory boolean object which will be set false in case of signal detection. Since the Loop class will check the state of this boolean object before each repetition, the loop will stop when a signal was receives.

Methods

set_signal(sig, handler_str)
jobmanager.progress.get_terminal_size(defaultw=80)[source]

Checks various methods to determine the terminal size

Methods: - shutil.get_terminal_size (only Python3) - fcntl.ioctl - subprocess.check_output - os.environ

Parameters:

defaultw : int

Default width of terminal.

Returns:

width, height : int

Width and height of the terminal. If one of them could not be found, None is return in its place.

jobmanager.progress.humanize_speed(c_per_sec)[source]

convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero.

jobmanager.progress.humanize_time(secs)[source]

convert second in to hh:mm:ss format

jobmanager.progress.terminal_reserve(progress_obj, terminal_obj=None, verbose=0, identifier=None)[source]

Registers the terminal (stdout) for printing.

Useful to prevent multiple processes from writing progress bars to stdout.

One process (server) prints to stdout and a couple of subprocesses do not print to the same stdout, because the server has reserved it. Of course, the clients have to be nice and check with terminal_reserve first if they should (not) print. Nothing is locked.

Returns:

True if reservation was successful (or if we have already reserved this tty),

False if there already is a reservation from another instance.

jobmanager.progress.terminal_unreserve(progress_obj, terminal_obj=None, verbose=0, identifier=None)[source]

Unregisters the terminal (stdout) for printing.

an instance (progress_obj) can only unreserve the tty (terminal_obj) when it also reserved it

see terminal_reserved for more information

Returns:None

User Convenience

Decorators

Implements decorators/wrappers for simple use-cases of jobmanager.

class jobmanager.decorators.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, authkey=None)[source]

A progressbar-decorated version of multiprocessing.Pool It is possible to set the authkey argument manually. The authkey is saved as the property Pool().authkey.

The methods map and map_async work as expected.

Class which supports an async version of applying functions to arguments.

Methods

map(*args, **kwargs)
map_async(func, iterable[, chunksize, callback]) Asynchronous equivalent of map() builtin.
map_async(func, iterable, chunksize=None, callback=None)[source]

Asynchronous equivalent of map() builtin. Note that chunksize and callback have no effect.

class jobmanager.decorators.ProgressBar(func, **kwargs)[source]

A wrapper/decorator with a text-based progress bar.

Methods: - __init__ - __call__

The idea is to add a status bar for a regular function just by wrapping the function via python’s decorator syntax.

In order to do so, the function needs to provide some extra information, namely the current state ‘count’ and the final state ‘max_count’. Simply expand your function by these two additional keyword arguments (or other pairs specified in jobmanager.validCountKwargs) and set their values during the calculation (see example 1 below). In that manner the decorated function as well as the not decorated function can simple be called as one would not care about any status information.

Alternatively one could explicitly set count and max_count in the function call, which circumvents the need to change the value of max_count AFTER instantiation of the progressBar.

Notes

You can also use this class as a wrapper and tune parameters of the progress bar.

>>> wrapper = ProgressBar(my_func, interval=.1)
>>> result = wrapper("wrapped function", kwarg=" test")

Methods

__call__(*args, **kwargs) Calls func - previously defined in __init__.
class jobmanager.decorators.ProgressBarExtended(func, **kwargs)[source]

extends the ProgressBar such that one can turn of the ProgressBar by giving an extra argument, namely ‘progress_bar_off’ and set its value to ‘True’.

Further there will be an additional argument passed to the function called ‘progress_bar’ which allows to stop the progress bar from within the function. note that there will be an function signature error if the function does not accept the extra argument ‘progress_bar’. So a general **kwargs at the end of the functions arguments will help. That is also the reason why the extended version comes in an extra class because it might otherwise break compatibility.

Methods

__call__(*args, **kwargs)
jobmanager.decorators.decorate_module_ProgressBar(module, decorator=<class 'jobmanager.decorators.ProgressBar'>, **kwargs)[source]

Decorates all decoratable functions in a module with a ProgressBar.

You can prevent wrapping of a function by not specifying the keyword arguments as defined in jobmanager.jobmanager.validCountKwargs or by defining a function `_jm_decorate_{func}”.

Parameters:

module : Python module

The module whose functions should be decorated.

decorator : bool

Specifies a decorator in jobmanager.decorators that should be used.

**kwargs : dict

Keyword arguments to the ProgressBar.

Notes

Decorating nested functions in a module might lead to unexpected behavior.

ODE wrapper

jobmanager.ode_wrapper.wrap_complex_intgeration(f_complex)[source]

if f: R x C^n -> C^n then this functions returns the real equivalent f_prime R x R^n x R^n -> R^n x R^n

such that a complex vector
cc = [vc_1, ... vc_n]
translates to
cr = [RE(vc_1), ... RE(vc_n), IM(vc_1), ... IM(vc_n)]

Core modules

jobmanager

jobmanager module

Richard Hartmann 2014

This module provides an easy way to implement distributed computing based on the python class SyncManager for remote communication and the python module multiprocessing for local parallelism.

class SIG_handler_Loop

The class Loop provides as mechanism to spawn a process repeating to call a certain function as well as a StatusBar class for the terminal.

class StatusBar

The class JobManager_Server will provide a server process handling the following tasks:

  • providing a list (queue) of arguments to be processed by client processes

(see put_arg and args_from_list) - handling the results of the calculations done by the client processes (see process_new_result) - when finished (all provided arguments have been processed and returned its result) process the obtained results (see process_final_result)

The class JobManager_Client

class jobmanager.jobmanager.JobManager_Client(server, authkey, port=42524, nproc=0, njobs=0, nice=19, no_warnings=False, verbose=1, show_statusbar_for_jobs=True, show_counter_only=False, interval=0.3)[source]

Calls the functions self.func with arguments fetched from the job_q. You should subclass this class and overwrite func to handle your own function.

The job_q is provided by the SyncManager who connects to a SyncManager setup by the JobManager_Server.

Spawns nproc subprocesses (__worker_func) to process arguments. Each subprocess gets an argument from the job_q, processes it and puts the result to the result_q.

If the job_q is empty, terminate the subprocess.

In case of any failure detected within the try except clause the argument, which just failed to process, the error and the hostname are put to the fail_q so the JobManager_Server can take care of that.

After that the traceback is written to a file with name traceback_args_<args>_err_<err>_<YYYY>_<MM>_<DD>_<hh>_<mm>_<ss>_<PID>.trb.

Then the process will terminate.

Attributes

connected

Methods

connect()
func(arg, const_arg) function to be called by the worker processes
get_manager_objects()
start() starts a number of nproc subprocess to work on the job_q
static func(arg, const_arg)[source]

function to be called by the worker processes

arg - provided by the job_q of the JobManager_Server

const_arg - tuple of constant arguments also provided by the JobManager_Server

to give status information to the Client class, use the variables (c, m) as additional parameters. c and m will be multiprocessing.sharedctypes.Synchronized objects with an underlying unsigned int. so set c.value to the current status of the operation ans m.value to the final status. So at the end of the operation c.value should be m.value.

NOTE: This is just some dummy implementation to be used for test reasons only! Subclass and overwrite this function to implement your own function.

start()[source]

starts a number of nproc subprocess to work on the job_q

SIGTERM and SIGINT are managed to terminate all subprocesses

retruns when all subprocesses have terminated

class jobmanager.jobmanager.JobManager_Server(authkey, const_arg=None, port=42524, verbose=1, msg_interval=1, fname_dump='auto', speed_calc_cycles=50)[source]

general usage:

  • init the JobManager_Server, start SyncManager server process
  • pass the arguments to be processed to the JobManager_Server

(put_arg, args_from_list)

  • start the JobManager_Server (start), which means to wait for incoming

results and to process them. Afterwards process all obtained data.

The default behavior of handling each incoming new result is to simply add the pair (arg, result) to the final_result list.

When finished the default final processing is to dump the final_result list to fname_for_final_result_dump

To change this behavior you may subclass the JobManager_Server and implement

  • an extended __init__ to change the type of the final_result attribute
  • process_new_result
  • process_final_result(self)

In case of any exceptions the JobManager_Server will call process_final_result and dump the unprocessed job_q as a list to fname_for_job_q_dump.

Also the signal SIGTERM is caught. In such a case it will raise SystemExit exception will will then be handle in the try except clause.

SystemExit and KeyboardInterrupt exceptions are not considered as failure. They are rather methods to shut down the Server gracefully. Therefore in such cases no traceback will be printed.

All other exceptions are probably due to some failure in function. A traceback it printed to stderr.

notes:
  • when the JobManager_Server gets killed (SIGKILL) and the SyncManager still

lives, the port used will occupied. considere sudo natstat -pna | grep 42524 to find the process still using the port

  • also the SyncManager ignores SIGTERM and SIGINT signals so you have to send

a SIGKILL.

Attributes

numjobs
numresults

Methods

args_from_list(args) serialize a list of arguments to the job_q
process_final_result() to implement user defined final processing
process_new_result(arg, result) Will be called when the result_q has data available.
put_arg(a) add argument a to the job_q
read_old_state([fname_dump])
show_statistics()
shutdown() “stop all spawned processes and clean up
start() starts to loop over incoming results
static_load(f)
args_from_list(args)[source]

serialize a list of arguments to the job_q

process_final_result()[source]

to implement user defined final processing

process_new_result(arg, result)[source]

Will be called when the result_q has data available. result is the computed result to the argument arg.

Should be overwritten by subclassing!

put_arg(a)[source]

add argument a to the job_q

shutdown()[source]

“stop all spawned processes and clean up

  • call process_final_result to handle all collected result
  • if job_q is not empty dump remaining job_q
start()[source]

starts to loop over incoming results

When finished, or on exception call stop() afterwards to shut down gracefully.

jobmanager.jobmanager.getDateForFileName(includePID=False)[source]

returns the current date-time and optionally the process id in the format YYYY_MM_DD_hh_mm_ss_pid

clients

The clients module

This module provides special subclasses of the JobManager_Client

class jobmanager.clients.Integration_Client_CPLX(**kwargs)[source]

A JobManager_Client subclass to integrate a set of complex valued ODE.

‘arg’ and ‘const_arg’ are understood as keyword arguments in oder to call ode_wrapper.integrate_cplx. They are passed to merge_arg_and_const_arg in order to separate the kwargs needed by ode_wrapper.integrate_cplx from the args (as tupled) passed to the function calculating derivative of the DGL. This tuple of parameters itself is passed as a special argument to ode_wrapper.integrate_cplx namely ‘args’.

If ‘arg’ or ‘const_arg’ provide the attribute ‘_asdict’ it will be called in order to construct dictionaries and use them for further processing.

The following keys MUST be provided by ‘arg’ or ‘const_arg’

t0 : initial time t1 : final time N : number of time steps for the solution x(t)

t = linspace(t0, t1, N)

f : function holding the derivatives args : additional positional arguments passed to f(t, x, *args) x0 : initial value integrator : type of integration method

‘zvode’: complex version of vode,
in case of stiff ode, f needs to be analytic see also scipy.integrate.ode -> ‘zvode’ most efficient for complex ODE
‘vode’, ‘lsoda’: both do automatic converkwargs.pop(‘args’)sion from the
complex ODE to the doubly dimensioned real system of ODE, and use the corresponding real integrator methods. Might be of the order of one magnitude slower that ‘zvode’. Consider using Integration_Client_REAL in the first place.
optional keys are:
verbose : default 0 integrator related arguments (see the scipy doc ODE)

The key ‘args’ itself (should be tuple) will be merged as kwargs[‘args’] = arg[‘args’] + const_arg[‘args’] which means that the call signature of f has to be f(t, x, arg_1, arg_2, ... const_arg_1, const_arg_2, ...).

Attributes

connected

Methods

func(arg, const_arg, c, m)
class jobmanager.clients.Integration_Client_REAL(**kwargs)[source]

A JobManager_Client subclass to integrate a set of complex real ODE.

same behavior as described for Integration_Client_CPLX except that ‘vode’ and ‘lsoda’ do not do any wrapping, so there is no performance issue and ‘zvode’ is obviously not supported.

Attributes

connected

Methods

func(arg, const_arg, c, m)
jobmanager.clients.merge_arg_and_const_arg(arg, const_arg)[source]

prepares data from arg and const_arg such that they can be passed to the general integration routine

arg and const_arg are both assumed to be dictionaries

the merge process must not alter arg nor const_arg in order to be used in the jobmanager context

returns the arguments passed to the function defining the derivative such that args_dgl = arg[‘args’] + const_arg[‘args’] where as arg[‘args’] and const_arg[‘args’] have been assumed to be tuples

e.g.
arg[‘args’] = (2, ‘low’) const_arg[‘args’] = (15, np.pi)

f will be called with f(t, x, 2, ‘low’, 15, np.pi)

returns further the combined dictionary arg + const_arg with the keyword ‘args’ removed

For any duplicate keys the value will be the value from the ‘arg’ dictionary.

servers

jobmanager.servers.recursive_scan_for_instance(obj, type, explicit_exclude=None)[source]

try to do some recursive check to see whether ‘obj’ is of type ‘type’ or contains items of ‘type’ type.

if obj is a mapping (like dict) this will only check for item iterated over via

for item in obj

which corresponds to the keys in the dict case.

The explicit_exclude argument may be a tuple of types for some explicit checking in the sense that if obj is an instance of one of the type given by explicit_exclude we know it is NOT an instance of type.

persistentData

class jobmanager.persistentData.PersistentDataStructure(name, path='./', verbose=1)[source]

Note: avoid using pickled dictionaries as binary keys! The problem with dicts is that the order of the keys, when returned as list, depends on the hash value of the keys. If the keys are strings, the hash value will be randomly seeded for each python session, which may lead to different binary representations of the same dict. Therefore the same dict may actually be considered as distinct keys.

The same hold true when using classes with default pickler routine as binary keys (because the pickler will essentially pickle the dictionary self.__dict__). If you want to use “complicated” python objects as binary keys make sure you implement your own pickle behavior without the need of dictionaries.

Methods

clear() delete all entries from the db
close() close the sqligtedict ans therefore the SQL database
erase() removed the database file from the disk
getData(key[, create_sub_data])
has_key(key)
is_closed()
is_open()
is_subdata(key)
need_open()
newSubData(key) if key is not in database
open() open the SQL database at self._filename = <path>/__<name>/<name>.db
setData(key, value[, overwrite]) write the key value pair to the data base
setDataFromSubData(key, subData) set an entry of the PDS with data from an other PDS
show_stat([recursive, prepend])
clear()[source]

delete all entries from the db

close()[source]

close the sqligtedict ans therefore the SQL database

erase()[source]

removed the database file from the disk

this is called recursively for all sub PersistentDataStructure

newSubData(key)[source]

if key is not in database create a new database (sqlitedict) which can be queried from this one via the key specified

this will automatically create a new file where the filename is internally managed (simple increasing number)

open()[source]

open the SQL database at self._filename = <path>/__<name>/<name>.db as sqlitedict

setData(key, value, overwrite=False)[source]

write the key value pair to the data base

if the key already exists, overwrite must be set True in oder to update the data for that key in the database

setDataFromSubData(key, subData)[source]

set an entry of the PDS with data from an other PDS

this means copying the appropriate file to the right place and rename them

Examples

simple example

Download: ../examples/simple_example.py.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function

import multiprocessing as mp
import numpy as np
from os.path import split, dirname, abspath
import sys
import time

# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path

import jobmanager as jm



class Example_Client(jm.JobManager_Client):
    def __init__(self):
        # start quiet client (verbose=0)
        super(Example_Client, self).__init__(server="localhost", 
                         authkey='simple example', 
                         verbose=0)
        
    @staticmethod
    def func(args, const_args):
        """simply return the current argument"""
        return args

        

class Example_Server(jm.JobManager_Server):
    def __init__(self):
        # server show status information (verbose=1)
        super(Example_Server, self).__init__(authkey='simple example',
                         verbose=1)

        self.final_result = 1
            
        
    def process_new_result(self, arg, result):
        """over write final_result with the new incoming result 
        if the new result is smaller then the final_result""" 
        if self.final_result > result:
            self.final_result = result
            
    def process_final_result(self):
        print("final_result:", self.final_result)
        


def run_server():
    with Example_Server() as server:
        for i in range(5000):
            server.put_arg(np.random.rand())
        server.start()
    
    
def run_client():
    client = Example_Client()
    client.start()


if __name__ == "__main__":
    p_server = mp.Process(target=run_server)
    p_server.start()
    
    time.sleep(1)
    
    p_client = mp.Process(target=run_client)
    p_client.start()

    p_client.join()
    p_server.join()

decorators

Download: ../examples/wrapper_example.py.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function
""" How to wrap or decorate a function with a progress bar.


"""

import multiprocessing as mp
from os.path import split, dirname, abspath
import sys
import time

# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path

import jobmanager as jm

def UnsignedIntValue(val=0):
    return mp.Value('I', val, lock=True)


@jm.decorators.ProgressBar
def decorated_function_alpha(an_argument,
                             c=UnsignedIntValue(),
                             m=UnsignedIntValue()):
    """ A simple example of a ProgressBar-decorated function.
    
    The arguments `c` and `m` are the counter and maximal counter
    variables of the ProgressBar. They are instances of
    `multiprocessing.Value`. 
    """
    m.value=10
    c.value=0
    for i in range(10):
        # this is were things are computed
        c.value += 1
        time.sleep(.2)
    return an_argument


@jm.decorators.ProgressBar
def decorated_function_beta(an_argument,
                            jmc=UnsignedIntValue(),
                            jmm=UnsignedIntValue()):
    """ A simple example of a ProgressBar-decorated function.
    
    In comparison to `decorated_function_alpha`, we now have the
    arguments `jmc` and `jmm`. Jobmanager automatically detects
    arguments that are registered in
    `jobmanager.jobmanager.validCountKwargs`.

    Note that we do not need to set the value of jmc to zero, as the 
    ProgressBar initiates the variable with zero.
    """
    jmm.value=10

    for i in range(10):
        # this is were things are computed
        jmc.value += 1
        time.sleep(.2)
    return an_argument


@jm.decorators.ProgressBar
def decorated_function_gamma(arg, kwarg="2",
                            jmc=UnsignedIntValue(),
                            jmm=UnsignedIntValue()):
    """ A simple example of a ProgressBar-decorated function.
    
    In comparison to `decorated_function_alpha`, we now have the
    arguments `jmc` and `jmm`. Jobmanager automatically detects
    arguments that are registered in
    `jobmanager.jobmanager.validCountKwargs`.

    Note that we do not need to set the value of jmc to zero, as the 
    ProgressBar initiates the variable with zero.
    """
    jmm.value=10

    for i in range(10):
        # this is were things are computed
        jmc.value += 1
        time.sleep(.2)
    return "{} {}".format(arg, kwarg)


def wrapped_function_beta(an_argument, jmc=None, jmm=None):
    """ A simple example of a ProgressBar-decorated function.
    
    In comparison to `decorated_function_beta`, the count arguments
    became keyword arguments. The function works with and without
    the ProgressBar.
    """
    if jmm is not None:
        jmm.value=10

    for i in range(10):
        # this is were things are computed
        if jmc is not None:
            jmc.value += 1
        time.sleep(.2)
    return an_argument



if __name__ == "__main__":
    ##d ecorated
    retd1 = decorated_function_alpha("argument")
    retd2 = decorated_function_beta("argument")
    retd3 = decorated_function_gamma("argument", kwarg="test")
    ## wrapped
    # When using the wrapper, you can define arguments for
    # `jm.progress.ProgressBar`.
    pb = jm.decorators.ProgressBarOverrideCount(wrapped_function_beta,
                                   interval=.05)
    retw1 = pb("argument")
    # or
    retw2 = jm.decorators.ProgressBarOverrideCount(wrapped_function_beta)("arg")
    
    print(retd1, retd2, retd3, sep=" | ")
    print(retw1, retw2, sep=" | ")