#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# It script it publish under GNU GENERAL PUBLIC LICENSE
# http://www.gnu.org/licenses/gpl-3.0.en.html
# Author: Jérôme ORNECH alias "Tuux" <tuxa@rtnp.org> all rights reserved
__author__ = 'Tuux'
import os
import sys
import subprocess
import locale
from collections import OrderedDict
import re
import string
from utils import Utils
import fcntl
import select
[docs]class TaskSpooler(object):
"""
Try to implement the original taskspooler capability with Python.
It's a pure Binding, where original TaskSpooler is use in background.
"""
# Here original TaskSpooler 1.0 help:
# usage: tsp [action] [-ngfmdE] [-L <lab>] [-D <id>] [cmd...]
# Env vars:
# TS_SOCKET the path to the unix socket used by the ts command.
# TS_MAILTO where to mail the result (on -m). Local user by default.
# TS_MAXFINISHED maximum finished jobs in the queue.
# TS_MAXCONN maximum number of ts connections at once.
# TS_ONFINISH binary called on job end (passes jobid, error, outfile, command).
# TS_ENV command called on enqueue. Its output determines the job information.
# TS_SAVELIST filename which will store the list, if the server dies.
# TS_SLOTS amount of jobs which can run at once, read on server start.
# TMPDIR directory where to place the output files and the default socket.
# Actions:
# -K kill the task spooler server
# -C clear the list of finished jobs
# -l show the job list (default action)
# -S [num] get/set the number of max simultaneous jobs of the server.
# -t [id] "tail -n 10 -f" the output of the job. Last run if not specified.
# -c [id] like -t, but shows all the lines. Last run if not specified.
# -p [id] show the pid of the job. Last run if not specified.
# -o [id] show the output file. Of last job run, if not specified.
# -i [id] show job information. Of last job run, if not specified.
# -s [id] show the job state. Of the last added, if not specified.
# -r [id] remove a job. The last added, if not specified.
# -w [id] wait for a job. The last added, if not specified.
# -k [id] send SIGTERM to the job process group. The last run, if not specified.
# -u [id] put that job first. The last added, if not specified.
# -U <id-id> swap two jobs in the queue.
# -B in case of full queue on the server, quit (2) instead of waiting.
# -h show this help
# -V show the program version
# Options adding jobs:
# -n don't store the output of the command.
# -E Keep stderr apart, in a name like the output file, but adding '.e'.
# -g gzip the stored output (if not -n).
# -f don't fork into background.
# -m send the output by e-mail (uses sendmail).
# -d the job will be run only if the job before ends well
# -D <id> the job will be run only if the job of given id ends well.
# -L <lab> name this task with a label, to be distinguished on listing.
# -N <num> number of slots required by the job (1 default).
def __init__(self):
self.utils = Utils()
self.tsp_patch = self.check_for_taskspooler()
self.remove_args = '-r'
self.state_args = '-s'
self.info_args = '-i'
self.output_args = '-o'
self.version_args = '-V'
locale.setlocale(locale.LC_ALL, '')
self.code = locale.getpreferredencoding()
self.decode = lambda b: b.decode(self.code)
self._tasks = {}
self.jobs_list = list()
[docs] def add_job(self,
command=None,
output=True,
stderr=False,
gzip=False,
fork=True,
send_email=False,
check_job_before=True,
check_job_id=None,
label='',
slot=1
):
"""
Add a job to the taskspooler server, many option are available during the add.
:param command: Command to execute with argument store inside a list
:type command: list
:param output: Store the output of the command.
:type output: bool
:param stderr: Keep stderr apart, in a name like the output file, but adding '.e'.
:type stderr: bool
:param gzip: Compress the stored output with gzip, without effect if output is False
:type gzip: bool
:param fork: True for fork precess, False for don't fork process
:type fork: bool
:param send_email: Send the output by e-mail (uses sendmail).
:type send_email: bool
:param check_job_before: The job will be run only if the job before ends well
:type check_job_before: bool
:param check_job_id: The job will be run only if the job of given id ends well.
:type check_job_id: int or None
:param label: Name the task with a label, to be distinguished on listing.
:type label: str
:param slot: Number of slots required by the job (1 default).
:type slot: int
:return: the job id given to the job
:rtype: int or None if can't
:raise TypeError: When output is not a bool
:raise TypeError: When stderr is not a bool
:raise TypeError: When gzip is not a bool
:raise TypeError: When fork is not a bool
:raise TypeError: When send_email is not a bool
:raise TypeError: When check_job_before is not a bool
:raise TypeError: When check_job_id is not a int or None
:raise TypeError: When label is not a str or None
:raise TypeError: When slot is not a int
"""
if self.get_path() is None:
return None
if type(output) != bool:
raise TypeError('"output" parameter must be bool type')
if type(stderr) != bool:
raise TypeError('"stderr" parameter must be bool type')
if type(gzip) != bool:
raise TypeError('"gzip" parameter must be bool type')
if type(fork) != bool:
raise TypeError('"fork" parameter must be bool type')
if type(send_email) != bool:
raise TypeError('"send_email" parameter must be bool type')
if type(check_job_before) != bool:
raise TypeError('"check_job_before" parameter must be bool type')
if check_job_id is not None and type(check_job_id) != int:
raise TypeError('"check_job_id" parameter must be int type or None')
if label != '' and type(label) != str:
raise TypeError('"label" parameter must be str type or None')
if type(slot) != int:
raise TypeError('"slot" parameter must be int type')
if type(command) != list:
raise TypeError('"command" parameter must be list type')
cmd = self.get_add_job_cmd(
check_job_before=check_job_before,
check_job_id=check_job_id,
command=command,
fork=fork,
gzip=gzip,
label=label,
output=output,
send_email=send_email,
slot=slot,
stderr=stderr
)
try:
# start subprocess
output = subprocess.check_output(cmd)
if output:
return int(output.strip())
except subprocess.CalledProcessError:
return None
[docs] def get_add_job_cmd(self,
command=None,
output=True,
stderr=False,
gzip=False,
fork=True,
send_email=False,
check_job_before=False,
check_job_id=None,
label='',
slot=1
):
"""
Generate a complex command line, and return it as list.
:param command: Command to execute with argument store inside a list
:type command: list
:param output: Store the output of the command.
:type output: bool
:param stderr: Keep stderr apart, in a name like the output file, but adding '.e'.
:type stderr: bool
:param gzip: Compress the stored output with gzip, without effect if output is False
:type gzip: bool
:param fork: True for fork precess, False for don't fork process
:type fork: bool
:param send_email: Send the output by e-mail (uses sendmail).
:type send_email: bool
:param check_job_before: The job will be run only if the job before ends well
:type check_job_before: bool
:param check_job_id: The job will be run only if the job of given id ends well.
:type check_job_id: int or None
:param label: Name the task with a label, to be distinguished on listing.
:type label: str
:param slot: Number of slots required by the job (1 default).
:type slot: int
:return: taskspooler server command line
:rtype: list
"""
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
# -n don't store the output of the command.
if not output:
cmd.append(unicode("-n", 'utf-8'))
# -E Keep stderr apart, in a name like the output file, but adding '.e'.
if stderr:
cmd.append(unicode("-E", 'utf-8'))
# -g gzip the stored output (if not -n).
if output and gzip:
cmd.append(unicode("-g", 'utf-8'))
# -f don't fork into background.
if not fork:
cmd.append(unicode("-f", 'utf-8'))
# -m send the output by e-mail (uses sendmail).
if send_email:
cmd.append(unicode("-m", 'utf-8'))
# -d the job will be run only if the job before ends well
if check_job_before:
cmd.append(unicode("-d", 'utf-8'))
# -D <id> the job will be run only if the job of given id ends well.
if check_job_id:
cmd.append(unicode("-D", 'utf-8'))
cmd.append(unicode(str(check_job_id), 'utf-8'))
# -L <lab> name this task with a label, to be distinguished on listing.
if label != '':
cmd.append(unicode("-L", 'utf-8'))
cmd.append(unicode(str(label), 'utf-8'))
# -N <num> number of slots required by the job (1 default).
if slot:
cmd.append(unicode("-N", 'utf-8'))
cmd.append(unicode(str(slot), 'utf-8'))
# add the thing to execute that because ...
cmd += command
# finally return the command line
return cmd
[docs] def check_for_taskspooler(self, app_name='tsp'):
"""
Check for is **tsp** application is available and return it absolute path. If **tsp** is not found
the taskspooler can't be initialize and raise a error.
:param app_name: The name of taskspooler application, it's here for permit to test the function
:type app_name: str
:return: Absolute path of tsp application or None if not found
:rtype: str or None
:raise SystemError: When **tsp** can't be found
"""
if not self.utils.which(app_name):
raise SystemError('A application is require for enable queue feature, please install "task-spooler" '
'or verify it is available on your $PATH env var')
else:
return self.utils.which(app_name)
[docs] def get_path(self):
"""
Return the executable **tsp** file found in PATH environement variable.
Normally TaskSpooler.get() can't return None that because, TaskSpooler.check_for_taskspooler() have normaly
raise a error, during the initialization of the TaskSpooler Class.
:return: the taskspooler absolute path
:rtype: str
"""
return self.tsp_patch
[docs] def kill_the_task_spooler_server(self):
"""
Implement the capability to kill the task spooler server the TaskSpooler.
It stop every running and queued task's, without any output.
-K kill the task spooler server
:return: None
:rtype: None
"""
if self.get_path() is None:
return None
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-K", 'utf-8'))
# start subprocess
output = subprocess.check_output(cmd)
# here we can do something
if output:
sys.stdout.write(output.strip())
sys.stdout.write('\n')
sys.stdout.flush()
else:
return None
[docs] def clear_the_list_of_finished_jobs(self):
"""
Implement the capability to clear finished jobs
-C clear the list of finished jobs
:return: None
:rtype: None
"""
if self.get_path() is None:
return None
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-C", 'utf-8'))
# start subprocess
output = subprocess.check_output(cmd)
# here we can do something
if output:
sys.stdout.write(output.strip())
sys.stdout.write('\n')
sys.stdout.flush()
else:
return None
[docs] def show_the_job_list(self):
"""
Display the job list, if you want the job list see TaskSpooler.get_jobs_list()
-l show the job list (default action)
:return: None
"""
sys.stdout.write('ID State Output E-Level Times(r/u/s) Command')
sys.stdout.write('\n')
for job in self.get_jobs_list():
sys.stdout.write(str(job['ID']))
sys.stdout.write(' ')
sys.stdout.write(str(job['State']))
sys.stdout.write(' ')
sys.stdout.write(str(job['Output']))
sys.stdout.write(' ')
sys.stdout.write(str(job['E-Level']))
sys.stdout.write(' ')
sys.stdout.write(str(job['Times']['r']))
sys.stdout.write('/')
sys.stdout.write(str(job['Times']['u']))
sys.stdout.write('/')
sys.stdout.write(str(job['Times']['s']))
sys.stdout.write(' ')
sys.stdout.write(str(job['Command']))
sys.stdout.write('\n')
sys.stdout.flush()
[docs] def get_jobs_list(self):
"""
Return the jobs list, with python format list. That function don't return a class attribute value,
it request the TaskSpooler server with -l option each time you call it function.
TaskSpooler.show_the_job_list() will use it function for get task list.
each item of it list contain a Dictionary with special keynames: ID, State, Output, E-Level, Times, Command
Times key contain a second dictionary level with keynames: r, s, u
:return: TaskSpooler server job list
:rtype: list
"""
if self.get_path() is None:
return None
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-l", 'utf-8'))
# start subprocess
output = subprocess.check_output(cmd)
returned_list = list()
if output:
# One job by line
output = output.split('\n')
for line in output:
# generate a dictionary for each line
job = dict()
# start by split each line with all it is more of 2 spaces
tmp_list = re.split(r'\s{2,}', line)
# then we have 2 return cases only
if len(tmp_list) == 4 or len(tmp_list) == 5:
if len(tmp_list) == 4:
job['ID'] = tmp_list[0]
job['State'] = tmp_list[1]
job['Output'] = tmp_list[2]
job['E-Level'] = ''
job['Times'] = ''
job['Command'] = tmp_list[3]
if len(tmp_list) == 5:
job['ID'] = tmp_list[0]
job['State'] = tmp_list[1]
job['Output'] = tmp_list[2]
job['E-Level'] = tmp_list[3]
job['Times'], job['Command'] = tmp_list[4].split(' ', 1)
job['Times'] = job['Times'].split('/')
job['Times'] = {
'r': job['Times'][0],
'u': job['Times'][1],
's': job['Times'][2]
}
# increase the returned_list
returned_list.append(job)
return returned_list
[docs] def get_number_of_simultaneous_jobs(self):
"""
Return the number of max simultaneous jobs of the server.
-S [num] get/set the number of max simultaneous jobs of the server.
:return: the actual value
:rtype: int or None if can't get
"""
if self.get_path() is None:
return None
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-S", 'utf-8'))
# start subprocess
output = subprocess.check_output(cmd)
# here we can do something
if output:
return int(output.strip())
else:
return None
[docs] def set_number_of_simultaneous_jobs(self, simultaneous_jobs=1):
"""
Set the number of max simultaneous jobs of the server.
-S [num] get/set the number of max simultaneous jobs of the server.
:return: the actual value
"""
if self.get_path() is None:
return None
if type(simultaneous_jobs) != int:
raise TypeError('"simultaneous_jobs" must be a int type')
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-S", 'utf-8'))
cmd.append(unicode(str(simultaneous_jobs), 'utf-8'))
# start subprocess
subprocess.check_output(cmd)
# -t [id] "tail -n 10 -f" the output of the job. Last run if not specified.
# -c [id] like -t, but shows all the lines. Last run if not specified.
# -p [id] show the pid of the job. Last run if not specified.
[docs] def get_pid(self, job=None):
"""
Return the pid number of a job number pass as parameter
:param job: the job number
:type job: int
:return: pid
:rtype: int
"""
if self.get_path() is None:
return None
if len(self.get_jobs_list()):
if job is None:
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-p", 'utf-8'))
else:
if type(job) != int:
raise TypeError('"job" must be a int type')
else:
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-p", 'utf-8'))
cmd.append(unicode(str(job), 'utf-8'))
# start subprocess
try:
output = subprocess.check_output(cmd, stderr=subprocess.PIPE)
return int(output.strip())
except subprocess.CalledProcessError:
return None
else:
return None
# -o [id] show the output file. Of last job run, if not specified.
[docs] def get_output_file(self, job=None):
"""
Return the output file of a job number pass as parameter.
If not specified , return the last job run output file.
:param job: the job number
:type job: int
:return: output file
:rtype: str
"""
if self.get_path() is None:
return None
if len(self.get_jobs_list()):
if job is None:
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-o", 'utf-8'))
else:
if type(job) != int:
raise TypeError('"job" must be a int type')
else:
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-o", 'utf-8'))
cmd.append(unicode(str(job), 'utf-8'))
# start subprocess
try:
output = subprocess.check_output(cmd, stderr=subprocess.PIPE)
return str(output.strip())
except subprocess.CalledProcessError:
return None
else:
return None
# -i [id] show job information. Of last job run, if not specified.
# -s [id] show the job state. Of the last added, if not specified.
[docs] def get_job_state(self, job=None):
"""
Return the output file of a job number pass as parameter.
If not specified , return the last job run output file.
:param job: the job number
:type job: int
:return: output file
:rtype: str
"""
if self.get_path() is None:
return None
if len(self.get_jobs_list()):
if job is None:
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-s", 'utf-8'))
else:
if type(job) != int:
raise TypeError('"job" must be a int type')
else:
# prepare the command line
cmd = list()
cmd.append(unicode(self.get_path(), 'utf-8'))
cmd.append(unicode("-s", 'utf-8'))
cmd.append(unicode(str(job), 'utf-8'))
# start subprocess
try:
output = subprocess.check_output(cmd, stderr=subprocess.PIPE)
return str(output.strip())
except subprocess.CalledProcessError:
return None
else:
return None
[docs] def get_version(self):
command = [self.tsp_patch, self.version_args]
output = subprocess.check_output(command)
if output:
return output.split(" - ", 1)[0]
else:
return "Task Spooler v(unknow)"