Source code for GalaxieDrake.taskspooler

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# It script it publish under GNU GENERAL PUBLIC LICENSE
# http://www.gnu.org/licenses/gpl-3.0.en.html
# Author: Jérôme ORNECH alias "Tuux" <tuxa@rtnp.org> all rights reserved
__author__ = 'Tuux'

import os
import sys
import subprocess
import locale
from collections import OrderedDict
import re
import string
from utils import Utils
import fcntl
import select


[docs]class TaskSpooler(object): """ Try to implement the original taskspooler capability with Python. It's a pure Binding, where original TaskSpooler is use in background. """ # Here original TaskSpooler 1.0 help: # usage: tsp [action] [-ngfmdE] [-L <lab>] [-D <id>] [cmd...] # Env vars: # TS_SOCKET the path to the unix socket used by the ts command. # TS_MAILTO where to mail the result (on -m). Local user by default. # TS_MAXFINISHED maximum finished jobs in the queue. # TS_MAXCONN maximum number of ts connections at once. # TS_ONFINISH binary called on job end (passes jobid, error, outfile, command). # TS_ENV command called on enqueue. Its output determines the job information. # TS_SAVELIST filename which will store the list, if the server dies. # TS_SLOTS amount of jobs which can run at once, read on server start. # TMPDIR directory where to place the output files and the default socket. # Actions: # -K kill the task spooler server # -C clear the list of finished jobs # -l show the job list (default action) # -S [num] get/set the number of max simultaneous jobs of the server. # -t [id] "tail -n 10 -f" the output of the job. Last run if not specified. # -c [id] like -t, but shows all the lines. Last run if not specified. # -p [id] show the pid of the job. Last run if not specified. # -o [id] show the output file. Of last job run, if not specified. # -i [id] show job information. Of last job run, if not specified. # -s [id] show the job state. Of the last added, if not specified. # -r [id] remove a job. The last added, if not specified. # -w [id] wait for a job. The last added, if not specified. # -k [id] send SIGTERM to the job process group. The last run, if not specified. # -u [id] put that job first. The last added, if not specified. # -U <id-id> swap two jobs in the queue. # -B in case of full queue on the server, quit (2) instead of waiting. # -h show this help # -V show the program version # Options adding jobs: # -n don't store the output of the command. # -E Keep stderr apart, in a name like the output file, but adding '.e'. # -g gzip the stored output (if not -n). # -f don't fork into background. # -m send the output by e-mail (uses sendmail). # -d the job will be run only if the job before ends well # -D <id> the job will be run only if the job of given id ends well. # -L <lab> name this task with a label, to be distinguished on listing. # -N <num> number of slots required by the job (1 default). def __init__(self): self.utils = Utils() self.tsp_patch = self.check_for_taskspooler() self.remove_args = '-r' self.state_args = '-s' self.info_args = '-i' self.output_args = '-o' self.version_args = '-V' locale.setlocale(locale.LC_ALL, '') self.code = locale.getpreferredencoding() self.decode = lambda b: b.decode(self.code) self._tasks = {} self.jobs_list = list()
[docs] def add_job(self, command=None, output=True, stderr=False, gzip=False, fork=True, send_email=False, check_job_before=True, check_job_id=None, label='', slot=1 ): """ Add a job to the taskspooler server, many option are available during the add. :param command: Command to execute with argument store inside a list :type command: list :param output: Store the output of the command. :type output: bool :param stderr: Keep stderr apart, in a name like the output file, but adding '.e'. :type stderr: bool :param gzip: Compress the stored output with gzip, without effect if output is False :type gzip: bool :param fork: True for fork precess, False for don't fork process :type fork: bool :param send_email: Send the output by e-mail (uses sendmail). :type send_email: bool :param check_job_before: The job will be run only if the job before ends well :type check_job_before: bool :param check_job_id: The job will be run only if the job of given id ends well. :type check_job_id: int or None :param label: Name the task with a label, to be distinguished on listing. :type label: str :param slot: Number of slots required by the job (1 default). :type slot: int :return: the job id given to the job :rtype: int or None if can't :raise TypeError: When output is not a bool :raise TypeError: When stderr is not a bool :raise TypeError: When gzip is not a bool :raise TypeError: When fork is not a bool :raise TypeError: When send_email is not a bool :raise TypeError: When check_job_before is not a bool :raise TypeError: When check_job_id is not a int or None :raise TypeError: When label is not a str or None :raise TypeError: When slot is not a int """ if self.get_path() is None: return None if type(output) != bool: raise TypeError('"output" parameter must be bool type') if type(stderr) != bool: raise TypeError('"stderr" parameter must be bool type') if type(gzip) != bool: raise TypeError('"gzip" parameter must be bool type') if type(fork) != bool: raise TypeError('"fork" parameter must be bool type') if type(send_email) != bool: raise TypeError('"send_email" parameter must be bool type') if type(check_job_before) != bool: raise TypeError('"check_job_before" parameter must be bool type') if check_job_id is not None and type(check_job_id) != int: raise TypeError('"check_job_id" parameter must be int type or None') if label != '' and type(label) != str: raise TypeError('"label" parameter must be str type or None') if type(slot) != int: raise TypeError('"slot" parameter must be int type') if type(command) != list: raise TypeError('"command" parameter must be list type') cmd = self.get_add_job_cmd( check_job_before=check_job_before, check_job_id=check_job_id, command=command, fork=fork, gzip=gzip, label=label, output=output, send_email=send_email, slot=slot, stderr=stderr ) try: # start subprocess output = subprocess.check_output(cmd) if output: return int(output.strip()) except subprocess.CalledProcessError: return None
[docs] def get_add_job_cmd(self, command=None, output=True, stderr=False, gzip=False, fork=True, send_email=False, check_job_before=False, check_job_id=None, label='', slot=1 ): """ Generate a complex command line, and return it as list. :param command: Command to execute with argument store inside a list :type command: list :param output: Store the output of the command. :type output: bool :param stderr: Keep stderr apart, in a name like the output file, but adding '.e'. :type stderr: bool :param gzip: Compress the stored output with gzip, without effect if output is False :type gzip: bool :param fork: True for fork precess, False for don't fork process :type fork: bool :param send_email: Send the output by e-mail (uses sendmail). :type send_email: bool :param check_job_before: The job will be run only if the job before ends well :type check_job_before: bool :param check_job_id: The job will be run only if the job of given id ends well. :type check_job_id: int or None :param label: Name the task with a label, to be distinguished on listing. :type label: str :param slot: Number of slots required by the job (1 default). :type slot: int :return: taskspooler server command line :rtype: list """ # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) # -n don't store the output of the command. if not output: cmd.append(unicode("-n", 'utf-8')) # -E Keep stderr apart, in a name like the output file, but adding '.e'. if stderr: cmd.append(unicode("-E", 'utf-8')) # -g gzip the stored output (if not -n). if output and gzip: cmd.append(unicode("-g", 'utf-8')) # -f don't fork into background. if not fork: cmd.append(unicode("-f", 'utf-8')) # -m send the output by e-mail (uses sendmail). if send_email: cmd.append(unicode("-m", 'utf-8')) # -d the job will be run only if the job before ends well if check_job_before: cmd.append(unicode("-d", 'utf-8')) # -D <id> the job will be run only if the job of given id ends well. if check_job_id: cmd.append(unicode("-D", 'utf-8')) cmd.append(unicode(str(check_job_id), 'utf-8')) # -L <lab> name this task with a label, to be distinguished on listing. if label != '': cmd.append(unicode("-L", 'utf-8')) cmd.append(unicode(str(label), 'utf-8')) # -N <num> number of slots required by the job (1 default). if slot: cmd.append(unicode("-N", 'utf-8')) cmd.append(unicode(str(slot), 'utf-8')) # add the thing to execute that because ... cmd += command # finally return the command line return cmd
[docs] def check_for_taskspooler(self, app_name='tsp'): """ Check for is **tsp** application is available and return it absolute path. If **tsp** is not found the taskspooler can't be initialize and raise a error. :param app_name: The name of taskspooler application, it's here for permit to test the function :type app_name: str :return: Absolute path of tsp application or None if not found :rtype: str or None :raise SystemError: When **tsp** can't be found """ if not self.utils.which(app_name): raise SystemError('A application is require for enable queue feature, please install "task-spooler" ' 'or verify it is available on your $PATH env var') else: return self.utils.which(app_name)
[docs] def get_path(self): """ Return the executable **tsp** file found in PATH environement variable. Normally TaskSpooler.get() can't return None that because, TaskSpooler.check_for_taskspooler() have normaly raise a error, during the initialization of the TaskSpooler Class. :return: the taskspooler absolute path :rtype: str """ return self.tsp_patch
[docs] def kill_the_task_spooler_server(self): """ Implement the capability to kill the task spooler server the TaskSpooler. It stop every running and queued task's, without any output. -K kill the task spooler server :return: None :rtype: None """ if self.get_path() is None: return None # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-K", 'utf-8')) # start subprocess output = subprocess.check_output(cmd) # here we can do something if output: sys.stdout.write(output.strip()) sys.stdout.write('\n') sys.stdout.flush() else: return None
[docs] def clear_the_list_of_finished_jobs(self): """ Implement the capability to clear finished jobs -C clear the list of finished jobs :return: None :rtype: None """ if self.get_path() is None: return None # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-C", 'utf-8')) # start subprocess output = subprocess.check_output(cmd) # here we can do something if output: sys.stdout.write(output.strip()) sys.stdout.write('\n') sys.stdout.flush() else: return None
[docs] def show_the_job_list(self): """ Display the job list, if you want the job list see TaskSpooler.get_jobs_list() -l show the job list (default action) :return: None """ sys.stdout.write('ID State Output E-Level Times(r/u/s) Command') sys.stdout.write('\n') for job in self.get_jobs_list(): sys.stdout.write(str(job['ID'])) sys.stdout.write(' ') sys.stdout.write(str(job['State'])) sys.stdout.write(' ') sys.stdout.write(str(job['Output'])) sys.stdout.write(' ') sys.stdout.write(str(job['E-Level'])) sys.stdout.write(' ') sys.stdout.write(str(job['Times']['r'])) sys.stdout.write('/') sys.stdout.write(str(job['Times']['u'])) sys.stdout.write('/') sys.stdout.write(str(job['Times']['s'])) sys.stdout.write(' ') sys.stdout.write(str(job['Command'])) sys.stdout.write('\n') sys.stdout.flush()
[docs] def get_jobs_list(self): """ Return the jobs list, with python format list. That function don't return a class attribute value, it request the TaskSpooler server with -l option each time you call it function. TaskSpooler.show_the_job_list() will use it function for get task list. each item of it list contain a Dictionary with special keynames: ID, State, Output, E-Level, Times, Command Times key contain a second dictionary level with keynames: r, s, u :return: TaskSpooler server job list :rtype: list """ if self.get_path() is None: return None # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-l", 'utf-8')) # start subprocess output = subprocess.check_output(cmd) returned_list = list() if output: # One job by line output = output.split('\n') for line in output: # generate a dictionary for each line job = dict() # start by split each line with all it is more of 2 spaces tmp_list = re.split(r'\s{2,}', line) # then we have 2 return cases only if len(tmp_list) == 4 or len(tmp_list) == 5: if len(tmp_list) == 4: job['ID'] = tmp_list[0] job['State'] = tmp_list[1] job['Output'] = tmp_list[2] job['E-Level'] = '' job['Times'] = '' job['Command'] = tmp_list[3] if len(tmp_list) == 5: job['ID'] = tmp_list[0] job['State'] = tmp_list[1] job['Output'] = tmp_list[2] job['E-Level'] = tmp_list[3] job['Times'], job['Command'] = tmp_list[4].split(' ', 1) job['Times'] = job['Times'].split('/') job['Times'] = { 'r': job['Times'][0], 'u': job['Times'][1], 's': job['Times'][2] } # increase the returned_list returned_list.append(job) return returned_list
[docs] def get_number_of_simultaneous_jobs(self): """ Return the number of max simultaneous jobs of the server. -S [num] get/set the number of max simultaneous jobs of the server. :return: the actual value :rtype: int or None if can't get """ if self.get_path() is None: return None # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-S", 'utf-8')) # start subprocess output = subprocess.check_output(cmd) # here we can do something if output: return int(output.strip()) else: return None
[docs] def set_number_of_simultaneous_jobs(self, simultaneous_jobs=1): """ Set the number of max simultaneous jobs of the server. -S [num] get/set the number of max simultaneous jobs of the server. :return: the actual value """ if self.get_path() is None: return None if type(simultaneous_jobs) != int: raise TypeError('"simultaneous_jobs" must be a int type') # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-S", 'utf-8')) cmd.append(unicode(str(simultaneous_jobs), 'utf-8')) # start subprocess subprocess.check_output(cmd)
# -t [id] "tail -n 10 -f" the output of the job. Last run if not specified. # -c [id] like -t, but shows all the lines. Last run if not specified. # -p [id] show the pid of the job. Last run if not specified.
[docs] def get_pid(self, job=None): """ Return the pid number of a job number pass as parameter :param job: the job number :type job: int :return: pid :rtype: int """ if self.get_path() is None: return None if len(self.get_jobs_list()): if job is None: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-p", 'utf-8')) else: if type(job) != int: raise TypeError('"job" must be a int type') else: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-p", 'utf-8')) cmd.append(unicode(str(job), 'utf-8')) # start subprocess try: output = subprocess.check_output(cmd, stderr=subprocess.PIPE) return int(output.strip()) except subprocess.CalledProcessError: return None else: return None
# -o [id] show the output file. Of last job run, if not specified.
[docs] def get_output_file(self, job=None): """ Return the output file of a job number pass as parameter. If not specified , return the last job run output file. :param job: the job number :type job: int :return: output file :rtype: str """ if self.get_path() is None: return None if len(self.get_jobs_list()): if job is None: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-o", 'utf-8')) else: if type(job) != int: raise TypeError('"job" must be a int type') else: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-o", 'utf-8')) cmd.append(unicode(str(job), 'utf-8')) # start subprocess try: output = subprocess.check_output(cmd, stderr=subprocess.PIPE) return str(output.strip()) except subprocess.CalledProcessError: return None else: return None
# -i [id] show job information. Of last job run, if not specified.
[docs] def get_job_information(self, job=None): """ Return the job information of a job number pass as parameter. If not specified , return the last job run information. :param job: the job number :type job: int :return: information :rtype: list """ if self.get_path() is None: return None if len(self.get_jobs_list()): if job is None: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-i", 'utf-8')) else: if type(job) != int: raise TypeError('"job" must be a int type') else: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-i", 'utf-8')) cmd.append(unicode(str(job), 'utf-8')) # start subprocess try: output = subprocess.check_output(cmd, stderr=subprocess.PIPE) tmp_list = output.strip().split('\n') count = 0 for item in tmp_list: tmp_list[count] = re.sub('^[0-9a-zA-Z_\s]+:\s', '', item) count += 1 return_dict = dict() if len(tmp_list) == 2: return_dict['Exit_status'] = None return_dict['Command'] = tmp_list[0] return_dict['Slots_required'] = tmp_list[1] return_dict['Enqueue_time'] = tmp_list[2] return_dict['Start_time'] = None return_dict['Time_running'] = None return_dict['End_time'] = None return_dict['Time_run'] = None if len(tmp_list) == 5: return_dict['Exit_status'] = None return_dict['Command'] = tmp_list[0] return_dict['Slots_required'] = tmp_list[1] return_dict['Enqueue_time'] = tmp_list[2] return_dict['Start_time'] = tmp_list[3] return_dict['Time_running'] = tmp_list[4] return_dict['End_time'] = None return_dict['Time_run'] = None if len(tmp_list) == 7: return_dict['Exit_status'] = tmp_list[0] return_dict['Command'] = tmp_list[1] return_dict['Slots_required'] = tmp_list[2] return_dict['Enqueue_time'] = tmp_list[3] return_dict['Start_time'] = tmp_list[4] return_dict['End_time'] = tmp_list[5] return_dict['Time_run'] = tmp_list[6] return_dict['Time_running'] = None return return_dict except subprocess.CalledProcessError: return None else: return None
# -s [id] show the job state. Of the last added, if not specified.
[docs] def get_job_state(self, job=None): """ Return the output file of a job number pass as parameter. If not specified , return the last job run output file. :param job: the job number :type job: int :return: output file :rtype: str """ if self.get_path() is None: return None if len(self.get_jobs_list()): if job is None: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-s", 'utf-8')) else: if type(job) != int: raise TypeError('"job" must be a int type') else: # prepare the command line cmd = list() cmd.append(unicode(self.get_path(), 'utf-8')) cmd.append(unicode("-s", 'utf-8')) cmd.append(unicode(str(job), 'utf-8')) # start subprocess try: output = subprocess.check_output(cmd, stderr=subprocess.PIPE) return str(output.strip()) except subprocess.CalledProcessError: return None else: return None
[docs] def get_version(self): command = [self.tsp_patch, self.version_args] output = subprocess.check_output(command) if output: return output.split(" - ", 1)[0] else: return "Task Spooler v(unknow)"